Madan Jampani
Committed by Gerrit Code Review

Move event handling to background thread

Change-Id: I8ccd1631fac14b1f753da4fb4b4ed01e5a045edf
(cherry picked from commit 8f906bfa)
...@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; ...@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.LinkedListMultimap; 20 import com.google.common.collect.LinkedListMultimap;
21 import com.google.common.collect.Multimap; 21 import com.google.common.collect.Multimap;
22 import com.google.common.collect.Sets; 22 import com.google.common.collect.Sets;
23 +
23 import org.onosproject.core.ApplicationId; 24 import org.onosproject.core.ApplicationId;
24 import org.onosproject.mastership.MastershipService; 25 import org.onosproject.mastership.MastershipService;
25 import org.onosproject.net.Device; 26 import org.onosproject.net.Device;
...@@ -186,16 +187,19 @@ class FlowRuleDriverProvider extends AbstractProvider implements FlowRuleProvide ...@@ -186,16 +187,19 @@ class FlowRuleDriverProvider extends AbstractProvider implements FlowRuleProvide
186 187
187 @Override 188 @Override
188 public void event(DeviceEvent event) { 189 public void event(DeviceEvent event) {
189 - executor.schedule(() -> pollDeviceFlowEntries(event.subject()), 0, TimeUnit.SECONDS); 190 + executor.execute(() -> handleEvent(event));
190 } 191 }
191 192
192 - @Override 193 + private void handleEvent(DeviceEvent event) {
193 - public boolean isRelevant(DeviceEvent event) {
194 Device device = event.subject(); 194 Device device = event.subject();
195 - return mastershipService.isLocalMaster(device.id()) && device.is(FlowRuleProgrammable.class) && 195 + boolean isRelevant = mastershipService.isLocalMaster(device.id())
196 - (event.type() == DEVICE_ADDED || 196 + && device.is(FlowRuleProgrammable.class)
197 + && (event.type() == DEVICE_ADDED ||
197 event.type() == DEVICE_UPDATED || 198 event.type() == DEVICE_UPDATED ||
198 (event.type() == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(device.id()))); 199 (event.type() == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(device.id())));
200 + if (isRelevant) {
201 + pollDeviceFlowEntries(event.subject());
202 + }
199 } 203 }
200 } 204 }
201 205
......
...@@ -163,6 +163,7 @@ public class DistributedFlowRuleStore ...@@ -163,6 +163,7 @@ public class DistributedFlowRuleStore
163 163
164 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap(); 164 private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
165 private ExecutorService messageHandlingExecutor; 165 private ExecutorService messageHandlingExecutor;
166 + private ExecutorService eventHandler;
166 167
167 private ScheduledFuture<?> backupTask; 168 private ScheduledFuture<?> backupTask;
168 private final ScheduledExecutorService backupSenderExecutor = 169 private final ScheduledExecutorService backupSenderExecutor =
...@@ -197,6 +198,8 @@ public class DistributedFlowRuleStore ...@@ -197,6 +198,8 @@ public class DistributedFlowRuleStore
197 198
198 local = clusterService.getLocalNode().id(); 199 local = clusterService.getLocalNode().id();
199 200
201 + eventHandler = Executors.newSingleThreadExecutor(
202 + groupedThreads("onos/flow", "event-handler", log));
200 messageHandlingExecutor = Executors.newFixedThreadPool( 203 messageHandlingExecutor = Executors.newFixedThreadPool(
201 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log)); 204 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
202 205
...@@ -233,6 +236,7 @@ public class DistributedFlowRuleStore ...@@ -233,6 +236,7 @@ public class DistributedFlowRuleStore
233 unregisterMessageHandlers(); 236 unregisterMessageHandlers();
234 deviceTableStats.removeListener(tableStatsListener); 237 deviceTableStats.removeListener(tableStatsListener);
235 deviceTableStats.destroy(); 238 deviceTableStats.destroy();
239 + eventHandler.shutdownNow();
236 messageHandlingExecutor.shutdownNow(); 240 messageHandlingExecutor.shutdownNow();
237 backupSenderExecutor.shutdownNow(); 241 backupSenderExecutor.shutdownNow();
238 log.info("Stopped"); 242 log.info("Stopped");
...@@ -663,6 +667,10 @@ public class DistributedFlowRuleStore ...@@ -663,6 +667,10 @@ public class DistributedFlowRuleStore
663 667
664 @Override 668 @Override
665 public void event(ReplicaInfoEvent event) { 669 public void event(ReplicaInfoEvent event) {
670 + eventHandler.execute(() -> handleEvent(event));
671 + }
672 +
673 + private void handleEvent(ReplicaInfoEvent event) {
666 if (!backupEnabled) { 674 if (!backupEnabled) {
667 return; 675 return;
668 } 676 }
......
...@@ -97,6 +97,7 @@ public class ConsistentDeviceMastershipStore ...@@ -97,6 +97,7 @@ public class ConsistentDeviceMastershipStore
97 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = 97 private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
98 Pattern.compile("device:(.*)"); 98 Pattern.compile("device:(.*)");
99 99
100 + private ExecutorService eventHandler;
100 private ExecutorService messageHandlingExecutor; 101 private ExecutorService messageHandlingExecutor;
101 private ScheduledExecutorService transferExecutor; 102 private ScheduledExecutorService transferExecutor;
102 private final LeadershipEventListener leadershipEventListener = 103 private final LeadershipEventListener leadershipEventListener =
...@@ -116,6 +117,10 @@ public class ConsistentDeviceMastershipStore ...@@ -116,6 +117,10 @@ public class ConsistentDeviceMastershipStore
116 117
117 @Activate 118 @Activate
118 public void activate() { 119 public void activate() {
120 +
121 + eventHandler = Executors.newSingleThreadExecutor(
122 + groupedThreads("onos/store/device/mastership", "event-handler", log));
123 +
119 messageHandlingExecutor = 124 messageHandlingExecutor =
120 Executors.newSingleThreadExecutor( 125 Executors.newSingleThreadExecutor(
121 groupedThreads("onos/store/device/mastership", "message-handler", log)); 126 groupedThreads("onos/store/device/mastership", "message-handler", log));
...@@ -136,10 +141,10 @@ public class ConsistentDeviceMastershipStore ...@@ -136,10 +141,10 @@ public class ConsistentDeviceMastershipStore
136 @Deactivate 141 @Deactivate
137 public void deactivate() { 142 public void deactivate() {
138 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT); 143 clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
144 + leadershipService.removeListener(leadershipEventListener);
139 messageHandlingExecutor.shutdown(); 145 messageHandlingExecutor.shutdown();
140 transferExecutor.shutdown(); 146 transferExecutor.shutdown();
141 - leadershipService.removeListener(leadershipEventListener); 147 + eventHandler.shutdown();
142 -
143 log.info("Stopped"); 148 log.info("Stopped");
144 } 149 }
145 150
...@@ -308,6 +313,10 @@ public class ConsistentDeviceMastershipStore ...@@ -308,6 +313,10 @@ public class ConsistentDeviceMastershipStore
308 313
309 @Override 314 @Override
310 public void event(LeadershipEvent event) { 315 public void event(LeadershipEvent event) {
316 + eventHandler.execute(() -> handleEvent(event));
317 + }
318 +
319 + private void handleEvent(LeadershipEvent event) {
311 Leadership leadership = event.subject(); 320 Leadership leadership = event.subject();
312 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic()); 321 DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
313 RoleInfo roleInfo = getNodes(deviceId); 322 RoleInfo roleInfo = getNodes(deviceId);
......