Yuta HIGUCHI

GossipStores: remove potentially blocking method out of netty thread

Change-Id: I2da9ba745c3a63bf9709fb77c1f260ea8f4529a8
...@@ -78,6 +78,8 @@ import java.util.Map.Entry; ...@@ -78,6 +78,8 @@ import java.util.Map.Entry;
78 import java.util.Objects; 78 import java.util.Objects;
79 import java.util.Set; 79 import java.util.Set;
80 import java.util.concurrent.ConcurrentMap; 80 import java.util.concurrent.ConcurrentMap;
81 +import java.util.concurrent.ExecutorService;
82 +import java.util.concurrent.Executors;
81 import java.util.concurrent.ScheduledExecutorService; 83 import java.util.concurrent.ScheduledExecutorService;
82 import java.util.concurrent.TimeUnit; 84 import java.util.concurrent.TimeUnit;
83 85
...@@ -160,8 +162,11 @@ public class GossipDeviceStore ...@@ -160,8 +162,11 @@ public class GossipDeviceStore
160 } 162 }
161 }; 163 };
162 164
165 + private ExecutorService executor;
166 +
163 private ScheduledExecutorService backgroundExecutor; 167 private ScheduledExecutorService backgroundExecutor;
164 168
169 +
165 @Activate 170 @Activate
166 public void activate() { 171 public void activate() {
167 clusterCommunicator.addSubscriber( 172 clusterCommunicator.addSubscriber(
...@@ -178,6 +183,8 @@ public class GossipDeviceStore ...@@ -178,6 +183,8 @@ public class GossipDeviceStore
178 clusterCommunicator.addSubscriber( 183 clusterCommunicator.addSubscriber(
179 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener()); 184 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
180 185
186 + executor = Executors.newCachedThreadPool(namedThreads("device-fg-%d"));
187 +
181 backgroundExecutor = 188 backgroundExecutor =
182 newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d"))); 189 newSingleThreadScheduledExecutor(minPriority(namedThreads("device-bg-%d")));
183 190
...@@ -194,6 +201,8 @@ public class GossipDeviceStore ...@@ -194,6 +201,8 @@ public class GossipDeviceStore
194 @Deactivate 201 @Deactivate
195 public void deactivate() { 202 public void deactivate() {
196 203
204 + executor.shutdownNow();
205 +
197 backgroundExecutor.shutdownNow(); 206 backgroundExecutor.shutdownNow();
198 try { 207 try {
199 boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS); 208 boolean timedout = backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS);
...@@ -1258,32 +1267,54 @@ public class GossipDeviceStore ...@@ -1258,32 +1267,54 @@ public class GossipDeviceStore
1258 } 1267 }
1259 } 1268 }
1260 1269
1261 - private class InternalDeviceEventListener implements ClusterMessageHandler { 1270 + private final class InternalDeviceEventListener
1271 + implements ClusterMessageHandler {
1262 @Override 1272 @Override
1263 public void handle(ClusterMessage message) { 1273 public void handle(ClusterMessage message) {
1264 1274
1265 log.debug("Received device update event from peer: {}", message.sender()); 1275 log.debug("Received device update event from peer: {}", message.sender());
1266 - InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload()); 1276 + InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1267 1277
1268 ProviderId providerId = event.providerId(); 1278 ProviderId providerId = event.providerId();
1269 DeviceId deviceId = event.deviceId(); 1279 DeviceId deviceId = event.deviceId();
1270 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); 1280 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1271 1281
1282 + executor.submit(new Runnable() {
1283 +
1284 + @Override
1285 + public void run() {
1286 + try {
1272 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription)); 1287 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1288 + } catch (Exception e) {
1289 + log.warn("Exception thrown handling device update", e);
1290 + }
1291 + }
1292 + });
1273 } 1293 }
1274 } 1294 }
1275 1295
1276 - private class InternalDeviceOfflineEventListener implements ClusterMessageHandler { 1296 + private final class InternalDeviceOfflineEventListener
1297 + implements ClusterMessageHandler {
1277 @Override 1298 @Override
1278 public void handle(ClusterMessage message) { 1299 public void handle(ClusterMessage message) {
1279 1300
1280 log.debug("Received device offline event from peer: {}", message.sender()); 1301 log.debug("Received device offline event from peer: {}", message.sender());
1281 - InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload()); 1302 + InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1282 1303
1283 DeviceId deviceId = event.deviceId(); 1304 DeviceId deviceId = event.deviceId();
1284 Timestamp timestamp = event.timestamp(); 1305 Timestamp timestamp = event.timestamp();
1285 1306
1307 + executor.submit(new Runnable() {
1308 +
1309 + @Override
1310 + public void run() {
1311 + try {
1286 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp)); 1312 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1313 + } catch (Exception e) {
1314 + log.warn("Exception thrown handling device offline", e);
1315 + }
1316 + }
1317 + });
1287 } 1318 }
1288 } 1319 }
1289 1320
...@@ -1293,30 +1324,53 @@ public class GossipDeviceStore ...@@ -1293,30 +1324,53 @@ public class GossipDeviceStore
1293 public void handle(ClusterMessage message) { 1324 public void handle(ClusterMessage message) {
1294 log.debug("Received device remove request from peer: {}", message.sender()); 1325 log.debug("Received device remove request from peer: {}", message.sender());
1295 DeviceId did = SERIALIZER.decode(message.payload()); 1326 DeviceId did = SERIALIZER.decode(message.payload());
1327 +
1328 + executor.submit(new Runnable() {
1329 +
1330 + @Override
1331 + public void run() {
1332 + try {
1296 removeDevice(did); 1333 removeDevice(did);
1334 + } catch (Exception e) {
1335 + log.warn("Exception thrown handling device remove", e);
1336 + }
1337 + }
1338 + });
1297 } 1339 }
1298 } 1340 }
1299 1341
1300 - private class InternalDeviceRemovedEventListener implements ClusterMessageHandler { 1342 + private final class InternalDeviceRemovedEventListener
1343 + implements ClusterMessageHandler {
1301 @Override 1344 @Override
1302 public void handle(ClusterMessage message) { 1345 public void handle(ClusterMessage message) {
1303 1346
1304 log.debug("Received device removed event from peer: {}", message.sender()); 1347 log.debug("Received device removed event from peer: {}", message.sender());
1305 - InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload()); 1348 + InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1306 1349
1307 DeviceId deviceId = event.deviceId(); 1350 DeviceId deviceId = event.deviceId();
1308 Timestamp timestamp = event.timestamp(); 1351 Timestamp timestamp = event.timestamp();
1309 1352
1353 + executor.submit(new Runnable() {
1354 +
1355 + @Override
1356 + public void run() {
1357 + try {
1310 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp)); 1358 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1359 + } catch (Exception e) {
1360 + log.warn("Exception thrown handling device removed", e);
1361 + }
1362 + }
1363 + });
1311 } 1364 }
1312 } 1365 }
1313 1366
1314 - private class InternalPortEventListener implements ClusterMessageHandler { 1367 + private final class InternalPortEventListener
1368 + implements ClusterMessageHandler {
1315 @Override 1369 @Override
1316 public void handle(ClusterMessage message) { 1370 public void handle(ClusterMessage message) {
1317 1371
1318 log.debug("Received port update event from peer: {}", message.sender()); 1372 log.debug("Received port update event from peer: {}", message.sender());
1319 - InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload()); 1373 + InternalPortEvent event = SERIALIZER.decode(message.payload());
1320 1374
1321 ProviderId providerId = event.providerId(); 1375 ProviderId providerId = event.providerId();
1322 DeviceId deviceId = event.deviceId(); 1376 DeviceId deviceId = event.deviceId();
...@@ -1328,16 +1382,27 @@ public class GossipDeviceStore ...@@ -1328,16 +1382,27 @@ public class GossipDeviceStore
1328 return; 1382 return;
1329 } 1383 }
1330 1384
1385 + executor.submit(new Runnable() {
1386 +
1387 + @Override
1388 + public void run() {
1389 + try {
1331 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions)); 1390 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1391 + } catch (Exception e) {
1392 + log.warn("Exception thrown handling port update", e);
1393 + }
1394 + }
1395 + });
1332 } 1396 }
1333 } 1397 }
1334 1398
1335 - private class InternalPortStatusEventListener implements ClusterMessageHandler { 1399 + private final class InternalPortStatusEventListener
1400 + implements ClusterMessageHandler {
1336 @Override 1401 @Override
1337 public void handle(ClusterMessage message) { 1402 public void handle(ClusterMessage message) {
1338 1403
1339 log.debug("Received port status update event from peer: {}", message.sender()); 1404 log.debug("Received port status update event from peer: {}", message.sender());
1340 - InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload()); 1405 + InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1341 1406
1342 ProviderId providerId = event.providerId(); 1407 ProviderId providerId = event.providerId();
1343 DeviceId deviceId = event.deviceId(); 1408 DeviceId deviceId = event.deviceId();
...@@ -1349,7 +1414,17 @@ public class GossipDeviceStore ...@@ -1349,7 +1414,17 @@ public class GossipDeviceStore
1349 return; 1414 return;
1350 } 1415 }
1351 1416
1417 + executor.submit(new Runnable() {
1418 +
1419 + @Override
1420 + public void run() {
1421 + try {
1352 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription)); 1422 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1423 + } catch (Exception e) {
1424 + log.warn("Exception thrown handling port update", e);
1425 + }
1426 + }
1427 + });
1353 } 1428 }
1354 } 1429 }
1355 1430
......
...@@ -34,6 +34,8 @@ import java.util.Map; ...@@ -34,6 +34,8 @@ import java.util.Map;
34 import java.util.Map.Entry; 34 import java.util.Map.Entry;
35 import java.util.Set; 35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.ConcurrentHashMap;
37 +import java.util.concurrent.ExecutorService;
38 +import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService; 39 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.TimeUnit; 40 import java.util.concurrent.TimeUnit;
39 41
...@@ -137,6 +139,8 @@ public class GossipHostStore ...@@ -137,6 +139,8 @@ public class GossipHostStore
137 } 139 }
138 }; 140 };
139 141
142 + private ExecutorService executor;
143 +
140 private ScheduledExecutorService backgroundExecutor; 144 private ScheduledExecutorService backgroundExecutor;
141 145
142 @Activate 146 @Activate
...@@ -151,6 +155,8 @@ public class GossipHostStore ...@@ -151,6 +155,8 @@ public class GossipHostStore
151 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT, 155 GossipHostStoreMessageSubjects.HOST_ANTI_ENTROPY_ADVERTISEMENT,
152 new InternalHostAntiEntropyAdvertisementListener()); 156 new InternalHostAntiEntropyAdvertisementListener());
153 157
158 + executor = Executors.newCachedThreadPool(namedThreads("host-fg-%d"));
159 +
154 backgroundExecutor = 160 backgroundExecutor =
155 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d"))); 161 newSingleThreadScheduledExecutor(minPriority(namedThreads("host-bg-%d")));
156 162
...@@ -166,6 +172,7 @@ public class GossipHostStore ...@@ -166,6 +172,7 @@ public class GossipHostStore
166 172
167 @Deactivate 173 @Deactivate
168 public void deactivate() { 174 public void deactivate() {
175 + executor.shutdownNow();
169 backgroundExecutor.shutdownNow(); 176 backgroundExecutor.shutdownNow();
170 try { 177 try {
171 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) { 178 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
...@@ -459,33 +466,58 @@ public class GossipHostStore ...@@ -459,33 +466,58 @@ public class GossipHostStore
459 } 466 }
460 } 467 }
461 468
462 - private class InternalHostEventListener implements ClusterMessageHandler { 469 + private final class InternalHostEventListener
470 + implements ClusterMessageHandler {
463 @Override 471 @Override
464 public void handle(ClusterMessage message) { 472 public void handle(ClusterMessage message) {
465 473
466 log.debug("Received host update event from peer: {}", message.sender()); 474 log.debug("Received host update event from peer: {}", message.sender());
467 - InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload()); 475 + InternalHostEvent event = SERIALIZER.decode(message.payload());
468 476
469 ProviderId providerId = event.providerId(); 477 ProviderId providerId = event.providerId();
470 HostId hostId = event.hostId(); 478 HostId hostId = event.hostId();
471 HostDescription hostDescription = event.hostDescription(); 479 HostDescription hostDescription = event.hostDescription();
472 Timestamp timestamp = event.timestamp(); 480 Timestamp timestamp = event.timestamp();
473 481
474 - notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp)); 482 + executor.submit(new Runnable() {
483 +
484 + @Override
485 + public void run() {
486 + try {
487 + notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
488 + hostId,
489 + hostDescription,
490 + timestamp));
491 + } catch (Exception e) {
492 + log.warn("Exception thrown handling host removed", e);
493 + }
494 + }
495 + });
475 } 496 }
476 } 497 }
477 498
478 - private class InternalHostRemovedEventListener implements ClusterMessageHandler { 499 + private final class InternalHostRemovedEventListener
500 + implements ClusterMessageHandler {
479 @Override 501 @Override
480 public void handle(ClusterMessage message) { 502 public void handle(ClusterMessage message) {
481 503
482 log.debug("Received host removed event from peer: {}", message.sender()); 504 log.debug("Received host removed event from peer: {}", message.sender());
483 - InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload()); 505 + InternalHostRemovedEvent event = SERIALIZER.decode(message.payload());
484 506
485 HostId hostId = event.hostId(); 507 HostId hostId = event.hostId();
486 Timestamp timestamp = event.timestamp(); 508 Timestamp timestamp = event.timestamp();
487 509
510 + executor.submit(new Runnable() {
511 +
512 + @Override
513 + public void run() {
514 + try {
488 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp)); 515 notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
516 + } catch (Exception e) {
517 + log.warn("Exception thrown handling host removed", e);
518 + }
519 + }
520 + });
489 } 521 }
490 } 522 }
491 523
...@@ -636,8 +668,8 @@ public class GossipHostStore ...@@ -636,8 +668,8 @@ public class GossipHostStore
636 } 668 }
637 } 669 }
638 670
639 - private final class InternalHostAntiEntropyAdvertisementListener implements 671 + private final class InternalHostAntiEntropyAdvertisementListener
640 - ClusterMessageHandler { 672 + implements ClusterMessageHandler {
641 673
642 @Override 674 @Override
643 public void handle(ClusterMessage message) { 675 public void handle(ClusterMessage message) {
......
...@@ -71,6 +71,8 @@ import java.util.Objects; ...@@ -71,6 +71,8 @@ import java.util.Objects;
71 import java.util.Set; 71 import java.util.Set;
72 import java.util.concurrent.ConcurrentHashMap; 72 import java.util.concurrent.ConcurrentHashMap;
73 import java.util.concurrent.ConcurrentMap; 73 import java.util.concurrent.ConcurrentMap;
74 +import java.util.concurrent.ExecutorService;
75 +import java.util.concurrent.Executors;
74 import java.util.concurrent.ScheduledExecutorService; 76 import java.util.concurrent.ScheduledExecutorService;
75 import java.util.concurrent.TimeUnit; 77 import java.util.concurrent.TimeUnit;
76 78
...@@ -141,6 +143,8 @@ public class GossipLinkStore ...@@ -141,6 +143,8 @@ public class GossipLinkStore
141 } 143 }
142 }; 144 };
143 145
146 + private ExecutorService executor;
147 +
144 private ScheduledExecutorService backgroundExecutors; 148 private ScheduledExecutorService backgroundExecutors;
145 149
146 @Activate 150 @Activate
...@@ -156,6 +160,8 @@ public class GossipLinkStore ...@@ -156,6 +160,8 @@ public class GossipLinkStore
156 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, 160 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
157 new InternalLinkAntiEntropyAdvertisementListener()); 161 new InternalLinkAntiEntropyAdvertisementListener());
158 162
163 + executor = Executors.newCachedThreadPool(namedThreads("link-fg-%d"));
164 +
159 backgroundExecutors = 165 backgroundExecutors =
160 newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d"))); 166 newSingleThreadScheduledExecutor(minPriority(namedThreads("link-bg-%d")));
161 167
...@@ -172,6 +178,8 @@ public class GossipLinkStore ...@@ -172,6 +178,8 @@ public class GossipLinkStore
172 @Deactivate 178 @Deactivate
173 public void deactivate() { 179 public void deactivate() {
174 180
181 + executor.shutdownNow();
182 +
175 backgroundExecutors.shutdownNow(); 183 backgroundExecutors.shutdownNow();
176 try { 184 try {
177 if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) { 185 if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
...@@ -762,7 +770,8 @@ public class GossipLinkStore ...@@ -762,7 +770,8 @@ public class GossipLinkStore
762 } 770 }
763 } 771 }
764 772
765 - private class InternalLinkEventListener implements ClusterMessageHandler { 773 + private final class InternalLinkEventListener
774 + implements ClusterMessageHandler {
766 @Override 775 @Override
767 public void handle(ClusterMessage message) { 776 public void handle(ClusterMessage message) {
768 777
...@@ -772,11 +781,22 @@ public class GossipLinkStore ...@@ -772,11 +781,22 @@ public class GossipLinkStore
772 ProviderId providerId = event.providerId(); 781 ProviderId providerId = event.providerId();
773 Timestamped<LinkDescription> linkDescription = event.linkDescription(); 782 Timestamped<LinkDescription> linkDescription = event.linkDescription();
774 783
784 + executor.submit(new Runnable() {
785 +
786 + @Override
787 + public void run() {
788 + try {
775 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription)); 789 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
790 + } catch (Exception e) {
791 + log.warn("Exception thrown handling link event", e);
792 + }
793 + }
794 + });
776 } 795 }
777 } 796 }
778 797
779 - private class InternalLinkRemovedEventListener implements ClusterMessageHandler { 798 + private final class InternalLinkRemovedEventListener
799 + implements ClusterMessageHandler {
780 @Override 800 @Override
781 public void handle(ClusterMessage message) { 801 public void handle(ClusterMessage message) {
782 802
...@@ -786,11 +806,22 @@ public class GossipLinkStore ...@@ -786,11 +806,22 @@ public class GossipLinkStore
786 LinkKey linkKey = event.linkKey(); 806 LinkKey linkKey = event.linkKey();
787 Timestamp timestamp = event.timestamp(); 807 Timestamp timestamp = event.timestamp();
788 808
809 + executor.submit(new Runnable() {
810 +
811 + @Override
812 + public void run() {
813 + try {
789 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp)); 814 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
815 + } catch (Exception e) {
816 + log.warn("Exception thrown handling link removed", e);
817 + }
818 + }
819 + });
790 } 820 }
791 } 821 }
792 822
793 - private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler { 823 + private final class InternalLinkAntiEntropyAdvertisementListener
824 + implements ClusterMessageHandler {
794 825
795 @Override 826 @Override
796 public void handle(ClusterMessage message) { 827 public void handle(ClusterMessage message) {
......