Ayaka Koshibe
Committed by Yuta Higuchi

LLDPLinkProvider listens to MastershipEvents

Change-Id: Iaa3655c680a8fc93921f0b83dc4fc16311222bf9
...@@ -149,8 +149,6 @@ public class LambdaForwarding { ...@@ -149,8 +149,6 @@ public class LambdaForwarding {
149 break; 149 break;
150 case DEVICE_AVAILABILITY_CHANGED: 150 case DEVICE_AVAILABILITY_CHANGED:
151 break; 151 break;
152 - case DEVICE_MASTERSHIP_CHANGED:
153 - break;
154 case DEVICE_REMOVED: 152 case DEVICE_REMOVED:
155 break; 153 break;
156 case DEVICE_SUSPENDED: 154 case DEVICE_SUSPENDED:
......
...@@ -146,8 +146,6 @@ public class MPLSForwarding { ...@@ -146,8 +146,6 @@ public class MPLSForwarding {
146 break; 146 break;
147 case DEVICE_AVAILABILITY_CHANGED: 147 case DEVICE_AVAILABILITY_CHANGED:
148 break; 148 break;
149 - case DEVICE_MASTERSHIP_CHANGED:
150 - break;
151 case DEVICE_REMOVED: 149 case DEVICE_REMOVED:
152 break; 150 break;
153 case DEVICE_SUSPENDED: 151 case DEVICE_SUSPENDED:
......
...@@ -60,12 +60,6 @@ public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> { ...@@ -60,12 +60,6 @@ public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
60 DEVICE_AVAILABILITY_CHANGED, 60 DEVICE_AVAILABILITY_CHANGED,
61 61
62 /** 62 /**
63 - * Signifies that the current controller instance relationship has
64 - * changed with respect to a device.
65 - */
66 - DEVICE_MASTERSHIP_CHANGED,
67 -
68 - /**
69 * Signifies that a port has been added. 63 * Signifies that a port has been added.
70 */ 64 */
71 PORT_ADDED, 65 PORT_ADDED,
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
16 package org.onlab.onos.net.device.impl; 16 package org.onlab.onos.net.device.impl;
17 17
18 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
19 -import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED;
20 import static org.onlab.onos.net.MastershipRole.*; 19 import static org.onlab.onos.net.MastershipRole.*;
21 import static org.onlab.util.Tools.namedThreads; 20 import static org.onlab.util.Tools.namedThreads;
22 import static org.slf4j.LoggerFactory.getLogger; 21 import static org.slf4j.LoggerFactory.getLogger;
...@@ -315,8 +314,6 @@ public class DeviceManager ...@@ -315,8 +314,6 @@ public class DeviceManager
315 if (event != null) { 314 if (event != null) {
316 log.trace("event: {} {}", event.type(), event); 315 log.trace("event: {} {}", event.type(), event);
317 post(event); 316 post(event);
318 - } else {
319 - post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, store.getDevice(deviceId)));
320 } 317 }
321 } 318 }
322 319
......
...@@ -441,7 +441,6 @@ public class ProxyArpManager implements ProxyArpService { ...@@ -441,7 +441,6 @@ public class ProxyArpManager implements ProxyArpService {
441 switch (event.type()) { 441 switch (event.type()) {
442 case DEVICE_ADDED: 442 case DEVICE_ADDED:
443 case DEVICE_AVAILABILITY_CHANGED: 443 case DEVICE_AVAILABILITY_CHANGED:
444 - case DEVICE_MASTERSHIP_CHANGED:
445 case DEVICE_SUSPENDED: 444 case DEVICE_SUSPENDED:
446 case DEVICE_UPDATED: 445 case DEVICE_UPDATED:
447 // nothing to do in these cases; handled when links get reported 446 // nothing to do in these cases; handled when links get reported
......
...@@ -20,6 +20,8 @@ import org.apache.felix.scr.annotations.Component; ...@@ -20,6 +20,8 @@ import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 +import org.onlab.onos.mastership.MastershipEvent;
24 +import org.onlab.onos.mastership.MastershipListener;
23 import org.onlab.onos.mastership.MastershipService; 25 import org.onlab.onos.mastership.MastershipService;
24 import org.onlab.onos.net.ConnectPoint; 26 import org.onlab.onos.net.ConnectPoint;
25 import org.onlab.onos.net.Device; 27 import org.onlab.onos.net.Device;
...@@ -40,7 +42,11 @@ import org.slf4j.Logger; ...@@ -40,7 +42,11 @@ import org.slf4j.Logger;
40 42
41 import java.util.Map; 43 import java.util.Map;
42 import java.util.concurrent.ConcurrentHashMap; 44 import java.util.concurrent.ConcurrentHashMap;
45 +import java.util.concurrent.ScheduledExecutorService;
46 +import java.util.concurrent.TimeUnit;
43 47
48 +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
49 +import static org.onlab.util.Tools.namedThreads;
44 import static org.slf4j.LoggerFactory.getLogger; 50 import static org.slf4j.LoggerFactory.getLogger;
45 51
46 52
...@@ -67,11 +73,17 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -67,11 +73,17 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
67 73
68 private LinkProviderService providerService; 74 private LinkProviderService providerService;
69 75
76 + private ScheduledExecutorService executor;
77 +
70 private final boolean useBDDP = true; 78 private final boolean useBDDP = true;
71 79
80 + private static final long INIT_DELAY = 5;
81 + private static final long DELAY = 5;
72 82
73 private final InternalLinkProvider listener = new InternalLinkProvider(); 83 private final InternalLinkProvider listener = new InternalLinkProvider();
74 84
85 + private final InternalRoleListener roleListener = new InternalRoleListener();
86 +
75 protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>(); 87 protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
76 88
77 /** 89 /**
...@@ -86,6 +98,8 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -86,6 +98,8 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
86 providerService = providerRegistry.register(this); 98 providerService = providerRegistry.register(this);
87 deviceService.addListener(listener); 99 deviceService.addListener(listener);
88 packetSevice.addProcessor(listener, 0); 100 packetSevice.addProcessor(listener, 0);
101 + masterService.addListener(roleListener);
102 +
89 LinkDiscovery ld; 103 LinkDiscovery ld;
90 for (Device device : deviceService.getDevices()) { 104 for (Device device : deviceService.getDevices()) {
91 ld = new LinkDiscovery(device, packetSevice, masterService, 105 ld = new LinkDiscovery(device, packetSevice, masterService,
...@@ -98,22 +112,57 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -98,22 +112,57 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
98 } 112 }
99 } 113 }
100 114
115 + executor = newSingleThreadScheduledExecutor(namedThreads("device-sync-%d"));
116 + executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY,
117 + DELAY, TimeUnit.SECONDS);
118 +
101 log.info("Started"); 119 log.info("Started");
102 } 120 }
103 121
104 @Deactivate 122 @Deactivate
105 public void deactivate() { 123 public void deactivate() {
124 + executor.shutdownNow();
106 for (LinkDiscovery ld : discoverers.values()) { 125 for (LinkDiscovery ld : discoverers.values()) {
107 ld.stop(); 126 ld.stop();
108 } 127 }
109 providerRegistry.unregister(this); 128 providerRegistry.unregister(this);
110 deviceService.removeListener(listener); 129 deviceService.removeListener(listener);
111 packetSevice.removeProcessor(listener); 130 packetSevice.removeProcessor(listener);
131 + masterService.removeListener(roleListener);
112 providerService = null; 132 providerService = null;
113 133
114 log.info("Stopped"); 134 log.info("Stopped");
115 } 135 }
116 136
137 + private class InternalRoleListener implements MastershipListener {
138 +
139 + @Override
140 + public void event(MastershipEvent event) {
141 +
142 + if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
143 + // only need new master events
144 + return;
145 + }
146 +
147 + DeviceId deviceId = event.subject();
148 + Device device = deviceService.getDevice(deviceId);
149 + if (device == null) {
150 + log.warn("Device {} doesn't exist, or isn't there yet", deviceId);
151 + return;
152 + }
153 + synchronized (discoverers) {
154 + if (!discoverers.containsKey(deviceId)) {
155 + // TODO: ideally, should never reach here
156 + log.debug("Device mastership changed ({}) {}",
157 + event.type(), deviceId);
158 + discoverers.put(deviceId, new LinkDiscovery(device,
159 + packetSevice, masterService, providerService,
160 + useBDDP));
161 + }
162 + }
163 + }
164 +
165 + }
117 166
118 private class InternalLinkProvider implements PacketProcessor, DeviceListener { 167 private class InternalLinkProvider implements PacketProcessor, DeviceListener {
119 168
...@@ -131,18 +180,22 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -131,18 +180,22 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
131 switch (event.type()) { 180 switch (event.type()) {
132 case DEVICE_ADDED: 181 case DEVICE_ADDED:
133 case DEVICE_UPDATED: 182 case DEVICE_UPDATED:
183 + synchronized (discoverers) {
134 ld = discoverers.get(deviceId); 184 ld = discoverers.get(deviceId);
135 if (ld == null) { 185 if (ld == null) {
136 - log.debug("Device added ({}) {}", event.type(), deviceId); 186 + log.debug("Device added ({}) {}", event.type(),
137 - discoverers.put(deviceId, 187 + deviceId);
138 - new LinkDiscovery(device, packetSevice, masterService, 188 + discoverers.put(deviceId, new LinkDiscovery(device,
139 - providerService, useBDDP)); 189 + packetSevice, masterService, providerService,
190 + useBDDP));
140 } else { 191 } else {
141 if (ld.isStopped()) { 192 if (ld.isStopped()) {
142 - log.debug("Device restarted ({}) {}", event.type(), deviceId); 193 + log.debug("Device restarted ({}) {}", event.type(),
194 + deviceId);
143 ld.start(); 195 ld.start();
144 } 196 }
145 } 197 }
198 + }
146 break; 199 break;
147 case PORT_ADDED: 200 case PORT_ADDED:
148 case PORT_UPDATED: 201 case PORT_UPDATED:
...@@ -193,15 +246,6 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -193,15 +246,6 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
193 ld.stop(); 246 ld.stop();
194 } 247 }
195 break; 248 break;
196 - case DEVICE_MASTERSHIP_CHANGED:
197 - if (!discoverers.containsKey(deviceId)) {
198 - // TODO: ideally, should never reach here
199 - log.debug("Device mastership changed ({}) {}", event.type(), deviceId);
200 - discoverers.put(deviceId,
201 - new LinkDiscovery(device, packetSevice, masterService,
202 - providerService, useBDDP));
203 - }
204 - break;
205 default: 249 default:
206 log.debug("Unknown event {}", event); 250 log.debug("Unknown event {}", event);
207 } 251 }
...@@ -224,4 +268,37 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -224,4 +268,37 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
224 } 268 }
225 } 269 }
226 270
271 + private final class SyncDeviceInfoTask implements Runnable {
272 +
273 + @Override
274 + public void run() {
275 + if (Thread.currentThread().isInterrupted()) {
276 + log.info("Interrupted, quitting");
277 + return;
278 + }
279 + // check what deviceService sees, to see if we are missing anything
280 + try {
281 + LinkDiscovery ld = null;
282 + for (Device dev : deviceService.getDevices()) {
283 + DeviceId did = dev.id();
284 + synchronized (discoverers) {
285 + if (!discoverers.containsKey(did)) {
286 + ld = new LinkDiscovery(dev, packetSevice,
287 + masterService, providerService, useBDDP);
288 + discoverers.put(did, ld);
289 + for (Port p : deviceService.getPorts(did)) {
290 + if (!p.number().isLogical()) {
291 + ld.addPort(p);
292 + }
293 + }
294 + }
295 + }
296 + }
297 + } catch (Exception e) {
298 + // catch all Exception to avoid Scheduled task being suppressed.
299 + log.error("Exception thrown during synchronization process", e);
300 + }
301 + }
302 + }
303 +
227 } 304 }
......