Ray Milkey

Fix ONOS-4683 - Don't process device events on the listener thread

Change-Id: Icc465311c2c047dba11bacc69c745bbda55ea714
...@@ -15,9 +15,16 @@ ...@@ -15,9 +15,16 @@
15 */ 15 */
16 package org.onosproject.provider.lldp.impl; 16 package org.onosproject.provider.lldp.impl;
17 17
18 -import com.google.common.collect.ImmutableMap; 18 +import java.util.Dictionary;
19 -import com.google.common.collect.ImmutableSet; 19 +import java.util.EnumSet;
20 -import com.google.common.collect.Maps; 20 +import java.util.Map;
21 +import java.util.Optional;
22 +import java.util.Properties;
23 +import java.util.Set;
24 +import java.util.concurrent.ConcurrentHashMap;
25 +import java.util.concurrent.ExecutorService;
26 +import java.util.concurrent.ScheduledExecutorService;
27 +
21 import org.apache.felix.scr.annotations.Activate; 28 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 29 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 30 import org.apache.felix.scr.annotations.Deactivate;
...@@ -26,7 +33,6 @@ import org.apache.felix.scr.annotations.Property; ...@@ -26,7 +33,6 @@ import org.apache.felix.scr.annotations.Property;
26 import org.apache.felix.scr.annotations.Reference; 33 import org.apache.felix.scr.annotations.Reference;
27 import org.apache.felix.scr.annotations.ReferenceCardinality; 34 import org.apache.felix.scr.annotations.ReferenceCardinality;
28 import org.onlab.packet.Ethernet; 35 import org.onlab.packet.Ethernet;
29 -import org.onlab.util.SharedExecutors;
30 import org.onlab.util.Tools; 36 import org.onlab.util.Tools;
31 import org.onosproject.cfg.ComponentConfigService; 37 import org.onosproject.cfg.ComponentConfigService;
32 import org.onosproject.cluster.ClusterMetadataService; 38 import org.onosproject.cluster.ClusterMetadataService;
...@@ -52,30 +58,25 @@ import org.onosproject.net.device.DeviceService; ...@@ -52,30 +58,25 @@ import org.onosproject.net.device.DeviceService;
52 import org.onosproject.net.flow.DefaultTrafficSelector; 58 import org.onosproject.net.flow.DefaultTrafficSelector;
53 import org.onosproject.net.flow.TrafficSelector; 59 import org.onosproject.net.flow.TrafficSelector;
54 import org.onosproject.net.link.DefaultLinkDescription; 60 import org.onosproject.net.link.DefaultLinkDescription;
55 -import org.onosproject.net.link.ProbedLinkProvider;
56 import org.onosproject.net.link.LinkProviderRegistry; 61 import org.onosproject.net.link.LinkProviderRegistry;
57 import org.onosproject.net.link.LinkProviderService; 62 import org.onosproject.net.link.LinkProviderService;
58 import org.onosproject.net.link.LinkService; 63 import org.onosproject.net.link.LinkService;
64 +import org.onosproject.net.link.ProbedLinkProvider;
59 import org.onosproject.net.packet.PacketContext; 65 import org.onosproject.net.packet.PacketContext;
60 import org.onosproject.net.packet.PacketPriority; 66 import org.onosproject.net.packet.PacketPriority;
61 import org.onosproject.net.packet.PacketProcessor; 67 import org.onosproject.net.packet.PacketProcessor;
62 import org.onosproject.net.packet.PacketService; 68 import org.onosproject.net.packet.PacketService;
63 import org.onosproject.net.provider.AbstractProvider; 69 import org.onosproject.net.provider.AbstractProvider;
64 import org.onosproject.net.provider.ProviderId; 70 import org.onosproject.net.provider.ProviderId;
65 -import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
66 import org.onosproject.provider.lldpcommon.LinkDiscovery; 71 import org.onosproject.provider.lldpcommon.LinkDiscovery;
72 +import org.onosproject.provider.lldpcommon.LinkDiscoveryContext;
67 import org.onosproject.store.service.ConsistentMapException; 73 import org.onosproject.store.service.ConsistentMapException;
68 import org.osgi.service.component.ComponentContext; 74 import org.osgi.service.component.ComponentContext;
69 import org.slf4j.Logger; 75 import org.slf4j.Logger;
70 76
71 -import java.util.Dictionary; 77 +import com.google.common.collect.ImmutableMap;
72 -import java.util.EnumSet; 78 +import com.google.common.collect.ImmutableSet;
73 -import java.util.Map; 79 +import com.google.common.collect.Maps;
74 -import java.util.Optional;
75 -import java.util.Properties;
76 -import java.util.Set;
77 -import java.util.concurrent.ConcurrentHashMap;
78 -import java.util.concurrent.ScheduledExecutorService;
79 80
80 import static com.google.common.base.Strings.isNullOrEmpty; 81 import static com.google.common.base.Strings.isNullOrEmpty;
81 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 82 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
...@@ -143,6 +144,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv ...@@ -143,6 +144,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
143 private LinkProviderService providerService; 144 private LinkProviderService providerService;
144 145
145 private ScheduledExecutorService executor; 146 private ScheduledExecutorService executor;
147 + protected ExecutorService eventExecutor;
146 148
147 private boolean shuttingDown = false; 149 private boolean shuttingDown = false;
148 150
...@@ -242,6 +244,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv ...@@ -242,6 +244,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
242 244
243 @Activate 245 @Activate
244 public void activate(ComponentContext context) { 246 public void activate(ComponentContext context) {
247 + eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/linkevents", "events-%d", log));
245 shuttingDown = false; 248 shuttingDown = false;
246 cfgService.registerProperties(getClass()); 249 cfgService.registerProperties(getClass());
247 appId = coreService.registerApplication(PROVIDER_NAME); 250 appId = coreService.registerApplication(PROVIDER_NAME);
...@@ -280,6 +283,8 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv ...@@ -280,6 +283,8 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
280 283
281 cfgService.unregisterProperties(getClass(), false); 284 cfgService.unregisterProperties(getClass(), false);
282 disable(); 285 disable();
286 + eventExecutor.shutdownNow();
287 + eventExecutor = null;
283 log.info("Stopped"); 288 log.info("Stopped");
284 } 289 }
285 290
...@@ -548,27 +553,30 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv ...@@ -548,27 +553,30 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
548 return; 553 return;
549 } 554 }
550 555
551 - DeviceId deviceId = event.subject(); 556 + eventExecutor.execute(() -> {
552 - Device device = deviceService.getDevice(deviceId); 557 + DeviceId deviceId = event.subject();
553 - if (device == null) { 558 + Device device = deviceService.getDevice(deviceId);
554 - log.debug("Device {} doesn't exist, or isn't there yet", deviceId); 559 + if (device == null) {
555 - return; 560 + log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
556 - } 561 + return;
557 - if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) { 562 + }
558 - updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id())); 563 + if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
559 - } 564 + updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
565 + }
566 + });
560 } 567 }
561 } 568 }
562 569
563 - /** 570 + private class DeviceEventProcessor implements Runnable {
564 - * Processes device events. 571 +
565 - */ 572 + DeviceEvent event;
566 - private class InternalDeviceListener implements DeviceListener { 573 +
574 + DeviceEventProcessor(DeviceEvent event) {
575 + this.event = event;
576 + }
577 +
567 @Override 578 @Override
568 - public void event(DeviceEvent event) { 579 + public void run() {
569 - if (event.type() == Type.PORT_STATS_UPDATED) {
570 - return;
571 - }
572 Device device = event.subject(); 580 Device device = event.subject();
573 Port port = event.port(); 581 Port port = event.port();
574 if (device == null) { 582 if (device == null) {
...@@ -624,6 +632,22 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv ...@@ -624,6 +632,22 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
624 } 632 }
625 633
626 /** 634 /**
635 + * Processes device events.
636 + */
637 + private class InternalDeviceListener implements DeviceListener {
638 + @Override
639 + public void event(DeviceEvent event) {
640 + if (event.type() == Type.PORT_STATS_UPDATED) {
641 + return;
642 + }
643 +
644 + Runnable deviceEventProcessor = new DeviceEventProcessor(event);
645 +
646 + eventExecutor.execute(deviceEventProcessor);
647 + }
648 + }
649 +
650 + /**
627 * Processes incoming packets. 651 * Processes incoming packets.
628 */ 652 */
629 private class InternalPacketProcessor implements PacketProcessor { 653 private class InternalPacketProcessor implements PacketProcessor {
...@@ -780,7 +804,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv ...@@ -780,7 +804,7 @@ public class LldpLinkProvider extends AbstractProvider implements ProbedLinkProv
780 804
781 @Override 805 @Override
782 public void event(NetworkConfigEvent event) { 806 public void event(NetworkConfigEvent event) {
783 - SharedExecutors.getPoolThreadExecutor().execute(() -> { 807 + eventExecutor.execute(() -> {
784 if (event.configClass() == LinkDiscoveryFromDevice.class && 808 if (event.configClass() == LinkDiscoveryFromDevice.class &&
785 CONFIG_CHANGED.contains(event.type())) { 809 CONFIG_CHANGED.contains(event.type())) {
786 810
......
...@@ -75,6 +75,7 @@ import com.google.common.collect.ImmutableList; ...@@ -75,6 +75,7 @@ import com.google.common.collect.ImmutableList;
75 import com.google.common.collect.ImmutableMap; 75 import com.google.common.collect.ImmutableMap;
76 import com.google.common.collect.ImmutableSet; 76 import com.google.common.collect.ImmutableSet;
77 import com.google.common.collect.Lists; 77 import com.google.common.collect.Lists;
78 +import com.google.common.util.concurrent.MoreExecutors;
78 79
79 import static org.easymock.EasyMock.createMock; 80 import static org.easymock.EasyMock.createMock;
80 import static org.easymock.EasyMock.expect; 81 import static org.easymock.EasyMock.expect;
...@@ -147,6 +148,8 @@ public class LldpLinkProviderTest { ...@@ -147,6 +148,8 @@ public class LldpLinkProviderTest {
147 148
148 provider.activate(null); 149 provider.activate(null);
149 150
151 + provider.eventExecutor = MoreExecutors.newDirectExecutorService();
152 +
150 providerService = linkRegistry.registeredProvider(); 153 providerService = linkRegistry.registeredProvider();
151 } 154 }
152 155
......
...@@ -255,6 +255,9 @@ public class LinkDiscovery implements TimerTask { ...@@ -255,6 +255,9 @@ public class LinkDiscovery implements TimerTask {
255 } 255 }
256 256
257 private void sendProbes(Long portNumber) { 257 private void sendProbes(Long portNumber) {
258 + if (context.packetService() == null) {
259 + return;
260 + }
258 log.trace("Sending probes out to {}@{}", portNumber, device.id()); 261 log.trace("Sending probes out to {}@{}", portNumber, device.id());
259 OutboundPacket pkt = createOutBoundLldp(portNumber); 262 OutboundPacket pkt = createOutBoundLldp(portNumber);
260 context.packetService().emit(pkt); 263 context.packetService().emit(pkt);
......