Carmelo Cascone
Committed by Gerrit Code Review

ONOS-4422 Implemented device discovery in Bmv2 device provider and other

improvements

- Added listener for hello messages received from Bmv2 devices
- Added a periodic poller task to check device reachability and port
information updates
- Avoids periodically re-connecting the device if it is already
available in the core
- Fixed minor bug in Bmv2ThriftClient

Change-Id: I416d1880773e11b2ac6fa062d8be2b8f280786fb
...@@ -80,9 +80,9 @@ public final class Bmv2ThriftClient implements Bmv2Client { ...@@ -80,9 +80,9 @@ public final class Bmv2ThriftClient implements Bmv2Client {
80 // Seconds after a client is expired (and connection closed) in the cache. 80 // Seconds after a client is expired (and connection closed) in the cache.
81 private static final int CLIENT_CACHE_TIMEOUT = 60; 81 private static final int CLIENT_CACHE_TIMEOUT = 60;
82 // Number of connection retries after a network error. 82 // Number of connection retries after a network error.
83 - private static final int NUM_CONNECTION_RETRIES = 10; 83 + private static final int NUM_CONNECTION_RETRIES = 3;
84 // Time between retries in milliseconds. 84 // Time between retries in milliseconds.
85 - private static final int TIME_BETWEEN_RETRIES = 200; 85 + private static final int TIME_BETWEEN_RETRIES = 300;
86 86
87 // Static client cache where clients are removed after a predefined timeout. 87 // Static client cache where clients are removed after a predefined timeout.
88 private static final LoadingCache<DeviceId, Bmv2ThriftClient> 88 private static final LoadingCache<DeviceId, Bmv2ThriftClient>
...@@ -125,6 +125,15 @@ public final class Bmv2ThriftClient implements Bmv2Client { ...@@ -125,6 +125,15 @@ public final class Bmv2ThriftClient implements Bmv2Client {
125 } 125 }
126 126
127 /** 127 /**
128 + * Force a close of the transport session (if one is open) with the given device.
129 + *
130 + * @param deviceId device id
131 + */
132 + public static void forceDisconnectOf(DeviceId deviceId) {
133 + CLIENT_CACHE.invalidate(deviceId);
134 + }
135 +
136 + /**
128 * Pings the device. Returns true if the device is reachable, 137 * Pings the device. Returns true if the device is reachable,
129 * false otherwise. 138 * false otherwise.
130 * 139 *
...@@ -392,7 +401,7 @@ public final class Bmv2ThriftClient implements Bmv2Client { ...@@ -392,7 +401,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
392 LOG.debug("Packet transmission requested! > portNumber={}, packet={}", portNumber, packet); 401 LOG.debug("Packet transmission requested! > portNumber={}, packet={}", portNumber, packet);
393 } catch (TException e) { 402 } catch (TException e) {
394 LOG.debug("Exception while requesting packet transmission: {} > portNumber={}, packet={}", 403 LOG.debug("Exception while requesting packet transmission: {} > portNumber={}, packet={}",
395 - portNumber, packet); 404 + e, portNumber, packet);
396 throw new Bmv2RuntimeException(e.getMessage(), e); 405 throw new Bmv2RuntimeException(e.getMessage(), e);
397 } 406 }
398 } 407 }
......
...@@ -173,7 +173,12 @@ public final class SafeThriftClient { ...@@ -173,7 +173,12 @@ public final class SafeThriftClient {
173 private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries) 173 private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
174 throws TTransportException { 174 throws TTransportException {
175 int errors = 0; 175 int errors = 0;
176 - transport.close(); 176 + try {
177 + transport.close();
178 + } catch (Exception e) {
179 + // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
180 + // However, such an exception is not advertised by .close(), hence the general-purpose catch.
181 + }
177 182
178 while (errors < maxRetries) { 183 while (errors < maxRetries) {
179 try { 184 try {
...@@ -182,7 +187,7 @@ public final class SafeThriftClient { ...@@ -182,7 +187,7 @@ public final class SafeThriftClient {
182 LOG.debug("Reconnection successful"); 187 LOG.debug("Reconnection successful");
183 break; 188 break;
184 } catch (TTransportException e) { 189 } catch (TTransportException e) {
185 - LOG.error("Error while reconnecting:", e); 190 + LOG.debug("Error while reconnecting:", e);
186 errors++; 191 errors++;
187 192
188 if (errors < maxRetries) { 193 if (errors < maxRetries) {
......
...@@ -16,11 +16,17 @@ ...@@ -16,11 +16,17 @@
16 16
17 package org.onosproject.provider.bmv2.device.impl; 17 package org.onosproject.provider.bmv2.device.impl;
18 18
19 -import com.google.common.collect.Sets; 19 +import com.google.common.collect.Maps;
20 import org.apache.felix.scr.annotations.Component; 20 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 +import org.jboss.netty.util.HashedWheelTimer;
24 +import org.jboss.netty.util.Timeout;
25 +import org.jboss.netty.util.TimerTask;
23 import org.onlab.packet.ChassisId; 26 import org.onlab.packet.ChassisId;
27 +import org.onlab.util.Timer;
28 +import org.onosproject.bmv2.api.runtime.Bmv2ControlPlaneServer;
29 +import org.onosproject.bmv2.api.runtime.Bmv2Device;
24 import org.onosproject.common.net.AbstractDeviceProvider; 30 import org.onosproject.common.net.AbstractDeviceProvider;
25 import org.onosproject.core.ApplicationId; 31 import org.onosproject.core.ApplicationId;
26 import org.onosproject.core.CoreService; 32 import org.onosproject.core.CoreService;
...@@ -40,17 +46,20 @@ import org.onosproject.net.config.NetworkConfigRegistry; ...@@ -40,17 +46,20 @@ import org.onosproject.net.config.NetworkConfigRegistry;
40 import org.onosproject.net.device.DefaultDeviceDescription; 46 import org.onosproject.net.device.DefaultDeviceDescription;
41 import org.onosproject.net.device.DeviceDescription; 47 import org.onosproject.net.device.DeviceDescription;
42 import org.onosproject.net.device.DeviceService; 48 import org.onosproject.net.device.DeviceService;
49 +import org.onosproject.net.device.PortDescription;
43 import org.onosproject.net.provider.ProviderId; 50 import org.onosproject.net.provider.ProviderId;
44 import org.slf4j.Logger; 51 import org.slf4j.Logger;
45 52
46 import java.net.URI; 53 import java.net.URI;
47 import java.net.URISyntaxException; 54 import java.net.URISyntaxException;
48 -import java.util.Set; 55 +import java.util.List;
56 +import java.util.concurrent.ConcurrentMap;
49 import java.util.concurrent.ExecutorService; 57 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Executors; 58 import java.util.concurrent.Executors;
51 import java.util.concurrent.TimeUnit; 59 import java.util.concurrent.TimeUnit;
52 60
53 import static org.onlab.util.Tools.groupedThreads; 61 import static org.onlab.util.Tools.groupedThreads;
62 +import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.forceDisconnectOf;
54 import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.ping; 63 import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.ping;
55 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY; 64 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
56 import static org.slf4j.LoggerFactory.getLogger; 65 import static org.slf4j.LoggerFactory.getLogger;
...@@ -61,30 +70,14 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -61,30 +70,14 @@ import static org.slf4j.LoggerFactory.getLogger;
61 @Component(immediate = true) 70 @Component(immediate = true)
62 public class Bmv2DeviceProvider extends AbstractDeviceProvider { 71 public class Bmv2DeviceProvider extends AbstractDeviceProvider {
63 72
64 - private final Logger log = getLogger(Bmv2DeviceProvider.class); 73 + private static final Logger LOG = getLogger(Bmv2DeviceProvider.class);
65 74
66 public static final String MANUFACTURER = "p4.org"; 75 public static final String MANUFACTURER = "p4.org";
67 public static final String HW_VERSION = "bmv2"; 76 public static final String HW_VERSION = "bmv2";
77 + public static final String SCHEME = "bmv2";
68 private static final String APP_NAME = "org.onosproject.bmv2"; 78 private static final String APP_NAME = "org.onosproject.bmv2";
69 private static final String UNKNOWN = "unknown"; 79 private static final String UNKNOWN = "unknown";
70 - public static final String SCHEME = "bmv2"; 80 + private static final int POLL_INTERVAL = 5; // seconds
71 -
72 - private final ExecutorService deviceDiscoveryExecutor = Executors
73 - .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", log));
74 -
75 - private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
76 -
77 - private final ConfigFactory cfgFactory =
78 - new ConfigFactory<ApplicationId, Bmv2ProviderConfig>(
79 - APP_SUBJECT_FACTORY, Bmv2ProviderConfig.class,
80 - "devices", true) {
81 - @Override
82 - public Bmv2ProviderConfig createConfig() {
83 - return new Bmv2ProviderConfig();
84 - }
85 - };
86 -
87 - private final Set<DeviceId> activeDevices = Sets.newConcurrentHashSet();
88 81
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 protected NetworkConfigRegistry netCfgService; 83 protected NetworkConfigRegistry netCfgService;
...@@ -95,6 +88,17 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -95,6 +88,17 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 88 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected DeviceService deviceService; 89 protected DeviceService deviceService;
97 90
91 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 + protected Bmv2ControlPlaneServer controlPlaneServer;
93 +
94 + private final ExecutorService deviceDiscoveryExecutor = Executors
95 + .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", LOG));
96 +
97 + private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
98 + private final ConfigFactory cfgFactory = new InternalConfigFactory();
99 + private final ConcurrentMap<DeviceId, Boolean> activeDevices = Maps.newConcurrentMap();
100 + private final DevicePoller devicePoller = new DevicePoller();
101 + private final InternalHelloListener helloListener = new InternalHelloListener();
98 private ApplicationId appId; 102 private ApplicationId appId;
99 103
100 /** 104 /**
...@@ -104,56 +108,66 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -104,56 +108,66 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
104 super(new ProviderId("bmv2", "org.onosproject.provider.device")); 108 super(new ProviderId("bmv2", "org.onosproject.provider.device"));
105 } 109 }
106 110
107 - protected static DeviceId deviceIdOf(Bmv2ProviderConfig.Bmv2DeviceInfo info) { 111 + private static DeviceId deviceIdOf(String ip, int port) {
108 try { 112 try {
109 - return DeviceId.deviceId(new URI( 113 + return DeviceId.deviceId(new URI(SCHEME, ip + ":" + port, null));
110 - SCHEME, info.ip().toString() + ":" + info.port(), null));
111 } catch (URISyntaxException e) { 114 } catch (URISyntaxException e) {
112 - throw new IllegalArgumentException( 115 + throw new IllegalArgumentException("Unable to build deviceID for device " + ip + ":" + port, e);
113 - "Unable to build deviceID for device "
114 - + info.ip().toString() + ":" + info.ip().toString(),
115 - e);
116 } 116 }
117 } 117 }
118 118
119 + /**
120 + * Creates a new device ID for the given BMv2 device.
121 + *
122 + * @param device a BMv2 device object
123 + *
124 + * @return a new device ID
125 + */
126 + public static DeviceId deviceIdOf(Bmv2Device device) {
127 + return deviceIdOf(device.thriftServerHost(), device.thriftServerPort());
128 + }
129 +
119 @Override 130 @Override
120 protected void activate() { 131 protected void activate() {
121 appId = coreService.registerApplication(APP_NAME); 132 appId = coreService.registerApplication(APP_NAME);
122 netCfgService.registerConfigFactory(cfgFactory); 133 netCfgService.registerConfigFactory(cfgFactory);
123 netCfgService.addListener(cfgListener); 134 netCfgService.addListener(cfgListener);
124 - 135 + controlPlaneServer.addHelloListener(helloListener);
136 + devicePoller.start();
125 super.activate(); 137 super.activate();
126 } 138 }
127 139
128 @Override 140 @Override
129 protected void deactivate() { 141 protected void deactivate() {
142 + devicePoller.stop();
143 + controlPlaneServer.removeHelloListener(helloListener);
130 try { 144 try {
131 - activeDevices.stream().forEach(did -> { 145 + activeDevices.forEach((did, value) -> {
132 deviceDiscoveryExecutor.execute(() -> disconnectDevice(did)); 146 deviceDiscoveryExecutor.execute(() -> disconnectDevice(did));
133 }); 147 });
134 deviceDiscoveryExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); 148 deviceDiscoveryExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
135 } catch (InterruptedException e) { 149 } catch (InterruptedException e) {
136 - log.error("Device discovery threads did not terminate"); 150 + LOG.error("Device discovery threads did not terminate");
137 } 151 }
138 deviceDiscoveryExecutor.shutdownNow(); 152 deviceDiscoveryExecutor.shutdownNow();
139 netCfgService.unregisterConfigFactory(cfgFactory); 153 netCfgService.unregisterConfigFactory(cfgFactory);
140 netCfgService.removeListener(cfgListener); 154 netCfgService.removeListener(cfgListener);
141 -
142 super.deactivate(); 155 super.deactivate();
143 } 156 }
144 157
145 @Override 158 @Override
146 public void triggerProbe(DeviceId deviceId) { 159 public void triggerProbe(DeviceId deviceId) {
160 + // Asynchronously trigger probe task.
147 deviceDiscoveryExecutor.execute(() -> executeProbe(deviceId)); 161 deviceDiscoveryExecutor.execute(() -> executeProbe(deviceId));
148 } 162 }
149 163
150 private void executeProbe(DeviceId did) { 164 private void executeProbe(DeviceId did) {
151 boolean reachable = isReachable(did); 165 boolean reachable = isReachable(did);
152 - log.debug("Probed device: id={}, reachable={}", 166 + LOG.debug("Probed device: id={}, reachable={}",
153 did.toString(), 167 did.toString(),
154 reachable); 168 reachable);
155 if (reachable) { 169 if (reachable) {
156 - connectDevice(did); 170 + discoverDevice(did);
157 } else { 171 } else {
158 disconnectDevice(did); 172 disconnectDevice(did);
159 } 173 }
...@@ -161,7 +175,7 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -161,7 +175,7 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
161 175
162 @Override 176 @Override
163 public void roleChanged(DeviceId deviceId, MastershipRole newRole) { 177 public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
164 - log.debug("roleChanged() is not yet implemented"); 178 + LOG.debug("roleChanged() is not yet implemented");
165 // TODO: implement mastership handling 179 // TODO: implement mastership handling
166 } 180 }
167 181
...@@ -172,41 +186,69 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -172,41 +186,69 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
172 186
173 @Override 187 @Override
174 public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) { 188 public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
175 - log.debug("changePortState() is not yet implemented"); 189 + LOG.debug("changePortState() is not yet implemented");
176 // TODO: implement port handling 190 // TODO: implement port handling
177 } 191 }
178 192
179 - private void connectDevice(DeviceId did) { 193 + private void discoverDevice(DeviceId did) {
180 - log.debug("Trying to create device on ONOS core: {}", did); 194 + LOG.debug("Starting device discovery... deviceId={}", did);
181 - SparseAnnotations annotations = DefaultAnnotations.builder() 195 +
182 - .set(AnnotationKeys.PROTOCOL, SCHEME) 196 + // Atomically notify device to core and update port information.
183 - .build(); 197 + activeDevices.compute(did, (k, v) -> {
184 - DeviceDescription descr = new DefaultDeviceDescription( 198 + if (!deviceService.isAvailable(did)) {
185 - did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION, 199 + // Device not available in the core, connect it now.
186 - UNKNOWN, UNKNOWN, new ChassisId(), annotations); 200 + SparseAnnotations annotations = DefaultAnnotations.builder()
187 - providerService.deviceConnected(did, descr); 201 + .set(AnnotationKeys.PROTOCOL, SCHEME)
188 - activeDevices.add(did); 202 + .build();
189 - discoverPorts(did); 203 + DeviceDescription descr = new DefaultDeviceDescription(
190 - } 204 + did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION,
191 - 205 + UNKNOWN, UNKNOWN, new ChassisId(), annotations);
192 - private void discoverPorts(DeviceId did) { 206 + providerService.deviceConnected(did, descr);
193 - Device device = deviceService.getDevice(did); 207 + }
194 - if (device.is(PortDiscovery.class)) { 208 + // Discover ports.
195 - PortDiscovery portConfig = device.as(PortDiscovery.class); 209 + Device device = deviceService.getDevice(did);
196 - providerService.updatePorts(did, portConfig.getPorts()); 210 + if (device.is(PortDiscovery.class)) {
197 - } else { 211 + PortDiscovery portConfig = device.as(PortDiscovery.class);
198 - log.warn("No PortDiscovery behavior for device {}", did); 212 + List<PortDescription> portDescriptions = portConfig.getPorts();
199 - } 213 + providerService.updatePorts(did, portDescriptions);
214 + } else {
215 + LOG.warn("No PortDiscovery behavior for device {}", did);
216 + }
217 + return true;
218 + });
200 } 219 }
201 220
202 private void disconnectDevice(DeviceId did) { 221 private void disconnectDevice(DeviceId did) {
203 - log.debug("Trying to remove device from ONOS core: {}", did); 222 + LOG.debug("Trying to disconnect device from core... deviceId={}", did);
204 - providerService.deviceDisconnected(did); 223 +
205 - activeDevices.remove(did); 224 + // Atomically disconnect device.
225 + activeDevices.compute(did, (k, v) -> {
226 + if (deviceService.isAvailable(did)) {
227 + providerService.deviceDisconnected(did);
228 + // Make sure to close the transport session with device.
229 + forceDisconnectOf(did);
230 + }
231 + return null;
232 + });
233 + }
234 +
235 + /**
236 + * Internal net-cfg config factory.
237 + */
238 + private class InternalConfigFactory extends ConfigFactory<ApplicationId, Bmv2ProviderConfig> {
239 +
240 + InternalConfigFactory() {
241 + super(APP_SUBJECT_FACTORY, Bmv2ProviderConfig.class, "devices", true);
242 + }
243 +
244 + @Override
245 + public Bmv2ProviderConfig createConfig() {
246 + return new Bmv2ProviderConfig();
247 + }
206 } 248 }
207 249
208 /** 250 /**
209 - * Handles net-cfg events. 251 + * Internal net-cfg event listener.
210 */ 252 */
211 private class InternalNetworkConfigListener implements NetworkConfigListener { 253 private class InternalNetworkConfigListener implements NetworkConfigListener {
212 254
...@@ -216,13 +258,13 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -216,13 +258,13 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
216 if (cfg != null) { 258 if (cfg != null) {
217 try { 259 try {
218 cfg.getDevicesInfo().stream().forEach(info -> { 260 cfg.getDevicesInfo().stream().forEach(info -> {
219 - triggerProbe(deviceIdOf(info)); 261 + triggerProbe(deviceIdOf(info.ip().toString(), info.port()));
220 }); 262 });
221 } catch (ConfigException e) { 263 } catch (ConfigException e) {
222 - log.error("Unable to read config: " + e); 264 + LOG.error("Unable to read config: " + e);
223 } 265 }
224 } else { 266 } else {
225 - log.error("Unable to read config (was null)"); 267 + LOG.error("Unable to read config (was null)");
226 } 268 }
227 } 269 }
228 270
...@@ -233,4 +275,50 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider { ...@@ -233,4 +275,50 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
233 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED); 275 event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
234 } 276 }
235 } 277 }
278 +
279 + /**
280 + * Listener triggered by Bmv2ControlPlaneServer each time a hello message is received.
281 + */
282 + private class InternalHelloListener implements Bmv2ControlPlaneServer.HelloListener {
283 + @Override
284 + public void handleHello(Bmv2Device device) {
285 + log.debug("Received hello from {}", device);
286 + triggerProbe(deviceIdOf(device));
287 + }
288 + }
289 +
290 + /**
291 + * Task that periodically trigger device probes.
292 + */
293 + private class DevicePoller implements TimerTask {
294 +
295 + private final HashedWheelTimer timer = Timer.getTimer();
296 + private Timeout timeout;
297 +
298 + @Override
299 + public void run(Timeout timeout) throws Exception {
300 + if (timeout.isCancelled()) {
301 + return;
302 + }
303 + log.debug("Executing polling on {} devices...", activeDevices.size());
304 + activeDevices.forEach((did, value) -> triggerProbe(did));
305 + timeout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.SECONDS);
306 + }
307 +
308 + /**
309 + * Starts the collector.
310 + */
311 + synchronized void start() {
312 + LOG.info("Starting device poller...");
313 + timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
314 + }
315 +
316 + /**
317 + * Stops the collector.
318 + */
319 + synchronized void stop() {
320 + LOG.info("Stopping device poller...");
321 + timeout.cancel();
322 + }
323 + }
236 } 324 }
......