Madan Jampani
Committed by Gerrit Code Review

ONOS-2043: Move device event handling in PacketManager off of event loop thread

Change-Id: Ia8b12e6ec3e732f0311adc7b3e7e63d07ad117e0
...@@ -58,9 +58,12 @@ import org.slf4j.Logger; ...@@ -58,9 +58,12 @@ import org.slf4j.Logger;
58 58
59 import java.util.Map; 59 import java.util.Map;
60 import java.util.concurrent.ConcurrentHashMap; 60 import java.util.concurrent.ConcurrentHashMap;
61 +import java.util.concurrent.ExecutorService;
62 +import java.util.concurrent.Executors;
61 63
62 import static com.google.common.base.Preconditions.checkNotNull; 64 import static com.google.common.base.Preconditions.checkNotNull;
63 import static org.slf4j.LoggerFactory.getLogger; 65 import static org.slf4j.LoggerFactory.getLogger;
66 +import static org.onlab.util.Tools.groupedThreads;
64 import static org.onosproject.security.AppGuard.checkPermission; 67 import static org.onosproject.security.AppGuard.checkPermission;
65 68
66 69
...@@ -92,6 +95,8 @@ public class PacketManager ...@@ -92,6 +95,8 @@ public class PacketManager
92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 private FlowObjectiveService objectiveService; 96 private FlowObjectiveService objectiveService;
94 97
98 + private ExecutorService eventHandlingExecutor;
99 +
95 private final DeviceListener deviceListener = new InternalDeviceListener(); 100 private final DeviceListener deviceListener = new InternalDeviceListener();
96 101
97 private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>(); 102 private final Map<Integer, PacketProcessor> processors = new ConcurrentHashMap<>();
...@@ -100,6 +105,8 @@ public class PacketManager ...@@ -100,6 +105,8 @@ public class PacketManager
100 105
101 @Activate 106 @Activate
102 public void activate() { 107 public void activate() {
108 + eventHandlingExecutor = Executors.newSingleThreadExecutor(
109 + groupedThreads("onos/net/packet", "event-handler"));
103 appId = coreService.getAppId(CoreService.CORE_APP_NAME); 110 appId = coreService.getAppId(CoreService.CORE_APP_NAME);
104 store.setDelegate(delegate); 111 store.setDelegate(delegate);
105 deviceService.addListener(deviceListener); 112 deviceService.addListener(deviceListener);
...@@ -111,6 +118,7 @@ public class PacketManager ...@@ -111,6 +118,7 @@ public class PacketManager
111 public void deactivate() { 118 public void deactivate() {
112 store.unsetDelegate(delegate); 119 store.unsetDelegate(delegate);
113 deviceService.removeListener(deviceListener); 120 deviceService.removeListener(deviceListener);
121 + eventHandlingExecutor.shutdown();
114 log.info("Stopped"); 122 log.info("Stopped");
115 } 123 }
116 124
...@@ -277,19 +285,25 @@ public class PacketManager ...@@ -277,19 +285,25 @@ public class PacketManager
277 private class InternalDeviceListener implements DeviceListener { 285 private class InternalDeviceListener implements DeviceListener {
278 @Override 286 @Override
279 public void event(DeviceEvent event) { 287 public void event(DeviceEvent event) {
280 - Device device = event.subject(); 288 + eventHandlingExecutor.execute(() -> {
281 - switch (event.type()) { 289 + try {
282 - case DEVICE_ADDED: 290 + Device device = event.subject();
283 - case DEVICE_AVAILABILITY_CHANGED: 291 + switch (event.type()) {
284 - if (deviceService.isAvailable(event.subject().id())) { 292 + case DEVICE_ADDED:
285 - for (PacketRequest request : store.existingRequests()) { 293 + case DEVICE_AVAILABILITY_CHANGED:
286 - pushRule(device, request); 294 + if (deviceService.isAvailable(event.subject().id())) {
295 + for (PacketRequest request : store.existingRequests()) {
296 + pushRule(device, request);
297 + }
287 } 298 }
299 + break;
300 + default:
301 + break;
288 } 302 }
289 - break; 303 + } catch (Exception e) {
290 - default: 304 + log.warn("Failed to process {}", event, e);
291 - break; 305 + }
292 - } 306 + });
293 } 307 }
294 } 308 }
295 309
......