Yuta HIGUCHI

Run Anti-Entropy in background

Change-Id: I233185d15f52359899427e214339be44cb62971c
...@@ -89,6 +89,7 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -89,6 +89,7 @@ import static org.slf4j.LoggerFactory.getLogger;
89 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; 89 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
90 import static org.onlab.onos.net.DefaultAnnotations.merge; 90 import static org.onlab.onos.net.DefaultAnnotations.merge;
91 import static com.google.common.base.Verify.verify; 91 import static com.google.common.base.Verify.verify;
92 +import static org.onlab.util.Tools.minPriority;
92 import static org.onlab.util.Tools.namedThreads; 93 import static org.onlab.util.Tools.namedThreads;
93 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 94 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
94 import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE; 95 import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
...@@ -159,7 +160,7 @@ public class GossipDeviceStore ...@@ -159,7 +160,7 @@ public class GossipDeviceStore
159 } 160 }
160 }; 161 };
161 162
162 - private ScheduledExecutorService executor; 163 + private ScheduledExecutorService backgroundExecutor;
163 164
164 @Activate 165 @Activate
165 public void activate() { 166 public void activate() {
...@@ -177,14 +178,14 @@ public class GossipDeviceStore ...@@ -177,14 +178,14 @@ public class GossipDeviceStore
177 clusterCommunicator.addSubscriber( 178 clusterCommunicator.addSubscriber(
178 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener()); 179 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
179 180
180 - executor = 181 + backgroundExecutor =
181 - newSingleThreadScheduledExecutor(namedThreads("anti-entropy-%d")); 182 + newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
182 183
183 // TODO: Make these configurable 184 // TODO: Make these configurable
184 long initialDelaySec = 5; 185 long initialDelaySec = 5;
185 long periodSec = 5; 186 long periodSec = 5;
186 // start anti-entropy thread 187 // start anti-entropy thread
187 - executor.scheduleAtFixedRate(new SendAdvertisementTask(), 188 + backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
188 initialDelaySec, periodSec, TimeUnit.SECONDS); 189 initialDelaySec, periodSec, TimeUnit.SECONDS);
189 190
190 log.info("Started"); 191 log.info("Started");
...@@ -193,9 +194,9 @@ public class GossipDeviceStore ...@@ -193,9 +194,9 @@ public class GossipDeviceStore
193 @Deactivate 194 @Deactivate
194 public void deactivate() { 195 public void deactivate() {
195 196
196 - executor.shutdownNow(); 197 + backgroundExecutor.shutdownNow();
197 try { 198 try {
198 - boolean timedout = executor.awaitTermination(5, TimeUnit.SECONDS); 199 + boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
199 if (timedout) { 200 if (timedout) {
200 log.error("Timeout during executor shutdown"); 201 log.error("Timeout during executor shutdown");
201 } 202 }
...@@ -1359,7 +1360,17 @@ public class GossipDeviceStore ...@@ -1359,7 +1360,17 @@ public class GossipDeviceStore
1359 public void handle(ClusterMessage message) { 1360 public void handle(ClusterMessage message) {
1360 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender()); 1361 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1361 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); 1362 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1362 - handleAdvertisement(advertisement); 1363 + backgroundExecutor.submit(new Runnable() {
1364 +
1365 + @Override
1366 + public void run() {
1367 + try {
1368 + handleAdvertisement(advertisement);
1369 + } catch (Exception e) {
1370 + log.warn("Exception thrown handling Device advertisements.", e);
1371 + }
1372 + }
1373 + });
1363 } 1374 }
1364 } 1375 }
1365 } 1376 }
......
...@@ -23,6 +23,7 @@ import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED; ...@@ -23,6 +23,7 @@ import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
23 import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED; 23 import static org.onlab.onos.net.host.HostEvent.Type.HOST_REMOVED;
24 import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED; 24 import static org.onlab.onos.net.host.HostEvent.Type.HOST_UPDATED;
25 import static org.onlab.util.Tools.namedThreads; 25 import static org.onlab.util.Tools.namedThreads;
26 +import static org.onlab.util.Tools.minPriority;
26 import static org.slf4j.LoggerFactory.getLogger; 27 import static org.slf4j.LoggerFactory.getLogger;
27 28
28 import java.io.IOException; 29 import java.io.IOException;
...@@ -136,7 +137,7 @@ public class GossipHostStore ...@@ -136,7 +137,7 @@ public class GossipHostStore
136 } 137 }
137 }; 138 };
138 139
139 - private ScheduledExecutorService executor; 140 + private ScheduledExecutorService backgroundExecutor;
140 141
141 @Activate 142 @Activate
142 public void activate() { 143 public void activate() {
...@@ -150,14 +151,14 @@ public class GossipHostStore ...@@ -150,14 +151,14 @@ public class GossipHostStore
150 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, 151 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
151 new InternalHostAntiEntropyAdvertisementListener()); 152 new InternalHostAntiEntropyAdvertisementListener());
152 153
153 - executor = 154 + backgroundExecutor =
154 - newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d")); 155 + newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
155 156
156 // TODO: Make these configurable 157 // TODO: Make these configurable
157 long initialDelaySec = 5; 158 long initialDelaySec = 5;
158 long periodSec = 5; 159 long periodSec = 5;
159 // start anti-entropy thread 160 // start anti-entropy thread
160 - executor.scheduleAtFixedRate(new SendAdvertisementTask(), 161 + backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
161 initialDelaySec, periodSec, TimeUnit.SECONDS); 162 initialDelaySec, periodSec, TimeUnit.SECONDS);
162 163
163 log.info("Started"); 164 log.info("Started");
...@@ -165,9 +166,9 @@ public class GossipHostStore ...@@ -165,9 +166,9 @@ public class GossipHostStore
165 166
166 @Deactivate 167 @Deactivate
167 public void deactivate() { 168 public void deactivate() {
168 - executor.shutdownNow(); 169 + backgroundExecutor.shutdownNow();
169 try { 170 try {
170 - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { 171 + if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
171 log.error("Timeout during executor shutdown"); 172 log.error("Timeout during executor shutdown");
172 } 173 }
173 } catch (InterruptedException e) { 174 } catch (InterruptedException e) {
...@@ -642,7 +643,17 @@ public class GossipHostStore ...@@ -642,7 +643,17 @@ public class GossipHostStore
642 public void handle(ClusterMessage message) { 643 public void handle(ClusterMessage message) {
643 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender()); 644 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
644 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); 645 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
645 - handleAntiEntropyAdvertisement(advertisement); 646 + backgroundExecutor.submit(new Runnable() {
647 +
648 + @Override
649 + public void run() {
650 + try {
651 + handleAntiEntropyAdvertisement(advertisement);
652 + } catch (Exception e) {
653 + log.warn("Exception thrown handling Host advertisements", e);
654 + }
655 + }
656 + });
646 } 657 }
647 } 658 }
648 } 659 }
......
...@@ -21,6 +21,7 @@ import com.google.common.collect.HashMultimap; ...@@ -21,6 +21,7 @@ import com.google.common.collect.HashMultimap;
21 import com.google.common.collect.ImmutableList; 21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Maps; 22 import com.google.common.collect.Maps;
23 import com.google.common.collect.SetMultimap; 23 import com.google.common.collect.SetMultimap;
24 +
24 import org.apache.commons.lang3.RandomUtils; 25 import org.apache.commons.lang3.RandomUtils;
25 import org.apache.felix.scr.annotations.Activate; 26 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component; 27 import org.apache.felix.scr.annotations.Component;
...@@ -87,6 +88,7 @@ import static org.onlab.onos.net.Link.Type.INDIRECT; ...@@ -87,6 +88,7 @@ import static org.onlab.onos.net.Link.Type.INDIRECT;
87 import static org.onlab.onos.net.LinkKey.linkKey; 88 import static org.onlab.onos.net.LinkKey.linkKey;
88 import static org.onlab.onos.net.link.LinkEvent.Type.*; 89 import static org.onlab.onos.net.link.LinkEvent.Type.*;
89 import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT; 90 import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
91 +import static org.onlab.util.Tools.minPriority;
90 import static org.onlab.util.Tools.namedThreads; 92 import static org.onlab.util.Tools.namedThreads;
91 import static org.slf4j.LoggerFactory.getLogger; 93 import static org.slf4j.LoggerFactory.getLogger;
92 94
...@@ -139,7 +141,7 @@ public class GossipLinkStore ...@@ -139,7 +141,7 @@ public class GossipLinkStore
139 } 141 }
140 }; 142 };
141 143
142 - private ScheduledExecutorService executor; 144 + private ScheduledExecutorService backgroundExecutors;
143 145
144 @Activate 146 @Activate
145 public void activate() { 147 public void activate() {
...@@ -154,14 +156,14 @@ public class GossipLinkStore ...@@ -154,14 +156,14 @@ public class GossipLinkStore
154 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, 156 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
155 new InternalLinkAntiEntropyAdvertisementListener()); 157 new InternalLinkAntiEntropyAdvertisementListener());
156 158
157 - executor = 159 + backgroundExecutors =
158 - newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d")); 160 + newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
159 161
160 // TODO: Make these configurable 162 // TODO: Make these configurable
161 long initialDelaySec = 5; 163 long initialDelaySec = 5;
162 long periodSec = 5; 164 long periodSec = 5;
163 // start anti-entropy thread 165 // start anti-entropy thread
164 - executor.scheduleAtFixedRate(new SendAdvertisementTask(), 166 + backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
165 initialDelaySec, periodSec, TimeUnit.SECONDS); 167 initialDelaySec, periodSec, TimeUnit.SECONDS);
166 168
167 log.info("Started"); 169 log.info("Started");
...@@ -170,9 +172,9 @@ public class GossipLinkStore ...@@ -170,9 +172,9 @@ public class GossipLinkStore
170 @Deactivate 172 @Deactivate
171 public void deactivate() { 173 public void deactivate() {
172 174
173 - executor.shutdownNow(); 175 + backgroundExecutors.shutdownNow();
174 try { 176 try {
175 - if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { 177 + if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
176 log.error("Timeout during executor shutdown"); 178 log.error("Timeout during executor shutdown");
177 } 179 }
178 } catch (InterruptedException e) { 180 } catch (InterruptedException e) {
...@@ -794,7 +796,18 @@ public class GossipLinkStore ...@@ -794,7 +796,18 @@ public class GossipLinkStore
794 public void handle(ClusterMessage message) { 796 public void handle(ClusterMessage message) {
795 log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender()); 797 log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
796 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); 798 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
797 - handleAntiEntropyAdvertisement(advertisement); 799 + backgroundExecutors.submit(new Runnable() {
800 +
801 + @Override
802 + public void run() {
803 + try {
804 + handleAntiEntropyAdvertisement(advertisement);
805 + } catch (Exception e) {
806 + log.warn("Exception thrown while handling Link advertisements", e);
807 + throw e;
808 + }
809 + }
810 + });
798 } 811 }
799 } 812 }
800 } 813 }
......
...@@ -46,6 +46,19 @@ public abstract class Tools { ...@@ -46,6 +46,19 @@ public abstract class Tools {
46 } 46 }
47 47
48 /** 48 /**
49 + * Returns a thread factory that produces threads with MIN_PRIORITY.
50 + *
51 + * @param factory backing ThreadFactory
52 + * @return thread factory
53 + */
54 + public static ThreadFactory minPriority(ThreadFactory factory) {
55 + return new ThreadFactoryBuilder()
56 + .setThreadFactory(factory)
57 + .setPriority(Thread.MIN_PRIORITY)
58 + .build();
59 + }
60 +
61 + /**
49 * Converts a string from hex to long. 62 * Converts a string from hex to long.
50 * 63 *
51 * @param string hex number in string form; sans 0x 64 * @param string hex number in string form; sans 0x
......