Thomas Vachuska

ONOS-5092 Moved expensive processing off the listener thread.

Change-Id: I9c1ac9521b608c273d71b990aba60b64e225dc7e
...@@ -42,7 +42,10 @@ import org.slf4j.Logger; ...@@ -42,7 +42,10 @@ import org.slf4j.Logger;
42 42
43 import java.util.HashSet; 43 import java.util.HashSet;
44 import java.util.Set; 44 import java.util.Set;
45 +import java.util.concurrent.ExecutorService;
45 46
47 +import static java.util.concurrent.Executors.newSingleThreadExecutor;
48 +import static org.onlab.util.Tools.groupedThreads;
46 import static org.slf4j.LoggerFactory.getLogger; 49 import static org.slf4j.LoggerFactory.getLogger;
47 50
48 /** 51 /**
...@@ -64,6 +67,8 @@ public class VirtualNetworkTopologyProvider extends AbstractProvider implements ...@@ -64,6 +67,8 @@ public class VirtualNetworkTopologyProvider extends AbstractProvider implements
64 67
65 protected TopologyListener topologyListener = new InternalTopologyListener(); 68 protected TopologyListener topologyListener = new InternalTopologyListener();
66 69
70 + private ExecutorService executor;
71 +
67 /** 72 /**
68 * Default constructor. 73 * Default constructor.
69 */ 74 */
...@@ -73,15 +78,17 @@ public class VirtualNetworkTopologyProvider extends AbstractProvider implements ...@@ -73,15 +78,17 @@ public class VirtualNetworkTopologyProvider extends AbstractProvider implements
73 78
74 @Activate 79 @Activate
75 public void activate() { 80 public void activate() {
81 + executor = newSingleThreadExecutor(groupedThreads("onos/vnet", "provider", log));
76 providerService = providerRegistry.register(this); 82 providerService = providerRegistry.register(this);
77 topologyService.addListener(topologyListener); 83 topologyService.addListener(topologyListener);
78 -
79 log.info("Started"); 84 log.info("Started");
80 } 85 }
81 86
82 @Deactivate 87 @Deactivate
83 public void deactivate() { 88 public void deactivate() {
84 topologyService.removeListener(topologyListener); 89 topologyService.removeListener(topologyListener);
90 + executor.shutdown();
91 + executor = null;
85 providerRegistry.unregister(this); 92 providerRegistry.unregister(this);
86 providerService = null; 93 providerService = null;
87 log.info("Stopped"); 94 log.info("Stopped");
...@@ -154,27 +161,14 @@ public class VirtualNetworkTopologyProvider extends AbstractProvider implements ...@@ -154,27 +161,14 @@ public class VirtualNetworkTopologyProvider extends AbstractProvider implements
154 private class InternalTopologyListener implements TopologyListener { 161 private class InternalTopologyListener implements TopologyListener {
155 @Override 162 @Override
156 public void event(TopologyEvent event) { 163 public void event(TopologyEvent event) {
157 - if (!isRelevant(event)) { 164 + // Perform processing off the listener thread.
158 - return; 165 + executor.submit(() -> providerService.topologyChanged(getConnectPoints(event.subject())));
159 - }
160 -
161 - Topology topology = event.subject();
162 - providerService.topologyChanged(getConnectPoints(topology));
163 } 166 }
164 167
165 @Override 168 @Override
166 public boolean isRelevant(TopologyEvent event) { 169 public boolean isRelevant(TopologyEvent event) {
167 - final boolean[] relevant = {false}; 170 + return event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED &&
168 - if (event.type().equals(TopologyEvent.Type.TOPOLOGY_CHANGED)) { 171 + event.reasons().stream().anyMatch(reason -> reason instanceof LinkEvent);
169 - event.reasons().forEach(event1 -> {
170 - // Only LinkEvents are relevant events, ie: DeviceEvents and others are ignored.
171 - if (event1 instanceof LinkEvent) {
172 - relevant[0] = true;
173 - return;
174 - }
175 - });
176 - }
177 - return relevant[0];
178 } 172 }
179 } 173 }
180 } 174 }
......