Brian O'Connor
Committed by Ray Milkey

Updates to ECM interface

Change-Id: Ie0cae42ac2b361cf3b94e5047c157cb0945f4209

Adding origin to IntentData and use it to pick GossipIntentStore peer

Change-Id: I50e9621a69a35ec02b8c8dd79cc926591e5a73e9
...@@ -84,14 +84,13 @@ public class IntentRemoveCommand extends AbstractShellCommand { ...@@ -84,14 +84,13 @@ public class IntentRemoveCommand extends AbstractShellCommand {
84 Key key = Key.of(new BigInteger(id, 16).longValue(), appId); 84 Key key = Key.of(new BigInteger(id, 16).longValue(), appId);
85 Intent intent = intentService.getIntent(key); 85 Intent intent = intentService.getIntent(key);
86 86
87 -
88 if (intent != null) { 87 if (intent != null) {
89 // set up latch and listener to track uninstall progress 88 // set up latch and listener to track uninstall progress
90 CountDownLatch latch = new CountDownLatch(1); 89 CountDownLatch latch = new CountDownLatch(1);
91 IntentListener listener = (IntentEvent event) -> { 90 IntentListener listener = (IntentEvent event) -> {
92 if (Objects.equals(event.subject().key(), key) && 91 if (Objects.equals(event.subject().key(), key) &&
93 - (event.type() == IntentEvent.Type.WITHDRAWN 92 + (event.type() == IntentEvent.Type.WITHDRAWN ||
94 - || event.type() == IntentEvent.Type.WITHDRAWN)) { 93 + event.type() == IntentEvent.Type.FAILED)) {
95 latch.countDown(); 94 latch.countDown();
96 } 95 }
97 }; 96 };
......
...@@ -17,6 +17,7 @@ package org.onosproject.net.intent; ...@@ -17,6 +17,7 @@ package org.onosproject.net.intent;
17 17
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 import com.google.common.collect.ImmutableList; 19 import com.google.common.collect.ImmutableList;
20 +import org.onosproject.cluster.NodeId;
20 import org.onosproject.store.Timestamp; 21 import org.onosproject.store.Timestamp;
21 22
22 import java.util.List; 23 import java.util.List;
...@@ -32,6 +33,7 @@ public class IntentData { //FIXME need to make this "immutable" ...@@ -32,6 +33,7 @@ public class IntentData { //FIXME need to make this "immutable"
32 33
33 private IntentState state; 34 private IntentState state;
34 private Timestamp version; 35 private Timestamp version;
36 + private NodeId origin;
35 37
36 private List<Intent> installables; 38 private List<Intent> installables;
37 39
...@@ -61,6 +63,19 @@ public class IntentData { //FIXME need to make this "immutable" ...@@ -61,6 +63,19 @@ public class IntentData { //FIXME need to make this "immutable"
61 return version; 63 return version;
62 } 64 }
63 65
66 + /**
67 + * Sets the origin, which is the node that created the instance.
68 + *
69 + * @param origin origin instance
70 + */
71 + public void setOrigin(NodeId origin) {
72 + this.origin = origin;
73 + }
74 +
75 + public NodeId origin() {
76 + return origin;
77 + }
78 +
64 public void setState(IntentState newState) { 79 public void setState(IntentState newState) {
65 this.state = newState; 80 this.state = newState;
66 } 81 }
......
...@@ -16,11 +16,9 @@ ...@@ -16,11 +16,9 @@
16 package org.onosproject.store.cluster.messaging; 16 package org.onosproject.store.cluster.messaging;
17 17
18 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
19 -
20 import org.onosproject.cluster.NodeId; 19 import org.onosproject.cluster.NodeId;
21 20
22 import java.io.IOException; 21 import java.io.IOException;
23 -import java.util.Set;
24 import java.util.concurrent.ExecutorService; 22 import java.util.concurrent.ExecutorService;
25 23
26 // TODO: remove IOExceptions? 24 // TODO: remove IOExceptions?
...@@ -51,9 +49,8 @@ public interface ClusterCommunicationService { ...@@ -51,9 +49,8 @@ public interface ClusterCommunicationService {
51 * @param message message to send 49 * @param message message to send
52 * @param toNodeId node identifier 50 * @param toNodeId node identifier
53 * @return true if the message was sent successfully; false otherwise. 51 * @return true if the message was sent successfully; false otherwise.
54 - * @throws IOException when I/O exception of some sort has occurred
55 */ 52 */
56 - boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException; 53 + boolean unicast(ClusterMessage message, NodeId toNodeId);
57 54
58 /** 55 /**
59 * Multicast a message to a set of controller nodes. 56 * Multicast a message to a set of controller nodes.
...@@ -62,7 +59,7 @@ public interface ClusterCommunicationService { ...@@ -62,7 +59,7 @@ public interface ClusterCommunicationService {
62 * @param nodeIds recipient node identifiers 59 * @param nodeIds recipient node identifiers
63 * @return true if the message was sent successfully to all nodes in the group; false otherwise. 60 * @return true if the message was sent successfully to all nodes in the group; false otherwise.
64 */ 61 */
65 - boolean multicast(ClusterMessage message, Set<NodeId> nodeIds); 62 + boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
66 63
67 /** 64 /**
68 * Sends a message synchronously. 65 * Sends a message synchronously.
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
16 package org.onosproject.store.cluster.messaging.impl; 16 package org.onosproject.store.cluster.messaging.impl;
17 17
18 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
19 -
20 import org.apache.felix.scr.annotations.Activate; 19 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component; 20 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate; 21 import org.apache.felix.scr.annotations.Deactivate;
...@@ -39,7 +38,6 @@ import org.slf4j.Logger; ...@@ -39,7 +38,6 @@ import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory; 38 import org.slf4j.LoggerFactory;
40 39
41 import java.io.IOException; 40 import java.io.IOException;
42 -import java.util.Set;
43 import java.util.concurrent.ExecutorService; 41 import java.util.concurrent.ExecutorService;
44 42
45 import static com.google.common.base.Preconditions.checkArgument; 43 import static com.google.common.base.Preconditions.checkArgument;
...@@ -107,7 +105,7 @@ public class ClusterCommunicationManager ...@@ -107,7 +105,7 @@ public class ClusterCommunicationManager
107 } 105 }
108 106
109 @Override 107 @Override
110 - public boolean multicast(ClusterMessage message, Set<NodeId> nodes) { 108 + public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
111 boolean ok = true; 109 boolean ok = true;
112 final ControllerNode localNode = clusterService.getLocalNode(); 110 final ControllerNode localNode = clusterService.getLocalNode();
113 byte[] payload = message.getBytes(); 111 byte[] payload = message.getBytes();
...@@ -120,8 +118,8 @@ public class ClusterCommunicationManager ...@@ -120,8 +118,8 @@ public class ClusterCommunicationManager
120 } 118 }
121 119
122 @Override 120 @Override
123 - public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException { 121 + public boolean unicast(ClusterMessage message, NodeId toNodeId) {
124 - return unicast(message.subject(), message.getBytes(), toNodeId); 122 + return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
125 } 123 }
126 124
127 private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException { 125 private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
...@@ -137,7 +135,6 @@ public class ClusterCommunicationManager ...@@ -137,7 +135,6 @@ public class ClusterCommunicationManager
137 } 135 }
138 } 136 }
139 137
140 -
141 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) { 138 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
142 try { 139 try {
143 return unicast(subject, payload, toNodeId); 140 return unicast(subject, payload, toNodeId);
......
...@@ -21,7 +21,6 @@ import com.google.common.collect.FluentIterable; ...@@ -21,7 +21,6 @@ import com.google.common.collect.FluentIterable;
21 import com.google.common.collect.ImmutableList; 21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Maps; 22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets; 23 import com.google.common.collect.Sets;
24 -
25 import org.apache.commons.lang3.RandomUtils; 24 import org.apache.commons.lang3.RandomUtils;
26 import org.apache.felix.scr.annotations.Activate; 25 import org.apache.felix.scr.annotations.Activate;
27 import org.apache.felix.scr.annotations.Component; 26 import org.apache.felix.scr.annotations.Component;
...@@ -305,14 +304,13 @@ public class GossipDeviceStore ...@@ -305,14 +304,13 @@ public class GossipDeviceStore
305 ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED, 304 ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
306 SERIALIZER.encode(deviceInjectedEvent)); 305 SERIALIZER.encode(deviceInjectedEvent));
307 306
308 - try { 307 + // TODO check unicast return value
309 - clusterCommunicator.unicast(clusterMessage, deviceNode); 308 + clusterCommunicator.unicast(clusterMessage, deviceNode);
310 - } catch (IOException e) { 309 + /* error log:
311 - log.warn("Failed to process injected device id: {} desc: {} " + 310 + log.warn("Failed to process injected device id: {} desc: {} " +
312 - "(cluster messaging failed: {})", 311 + "(cluster messaging failed: {})",
313 - deviceId, deviceDescription, e); 312 + deviceId, deviceDescription, e);
314 - } 313 + */
315 -
316 } 314 }
317 315
318 return deviceEvent; 316 return deviceEvent;
...@@ -556,13 +554,14 @@ public class GossipDeviceStore ...@@ -556,13 +554,14 @@ public class GossipDeviceStore
556 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions); 554 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
557 ClusterMessage clusterMessage = new ClusterMessage( 555 ClusterMessage clusterMessage = new ClusterMessage(
558 localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent)); 556 localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
559 - try { 557 +
560 - clusterCommunicator.unicast(clusterMessage, deviceNode); 558 + //TODO check unicast return value
561 - } catch (IOException e) { 559 + clusterCommunicator.unicast(clusterMessage, deviceNode);
562 - log.warn("Failed to process injected ports of device id: {} " + 560 + /* error log:
563 - "(cluster messaging failed: {})", 561 + log.warn("Failed to process injected ports of device id: {} " +
564 - deviceId, e); 562 + "(cluster messaging failed: {})",
565 - } 563 + deviceId, e);
564 + */
566 } 565 }
567 566
568 return deviceEvents == null ? Collections.emptyList() : deviceEvents; 567 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
...@@ -842,13 +841,13 @@ public class GossipDeviceStore ...@@ -842,13 +841,13 @@ public class GossipDeviceStore
842 DEVICE_REMOVE_REQ, 841 DEVICE_REMOVE_REQ,
843 SERIALIZER.encode(deviceId)); 842 SERIALIZER.encode(deviceId));
844 843
845 - try { 844 + // TODO check unicast return value
846 - clusterCommunicator.unicast(message, master); 845 + clusterCommunicator.unicast(message, master);
847 - } catch (IOException e) { 846 + /* error log:
848 - log.error("Failed to forward {} remove request to {}", deviceId, master, e); 847 + log.error("Failed to forward {} remove request to {}", deviceId, master, e);
849 - } 848 + */
850 849
851 - // event will be triggered after master processes it. 850 + // event will be triggered after master processes it.
852 return null; 851 return null;
853 } 852 }
854 853
......
...@@ -35,7 +35,6 @@ import org.onosproject.store.serializers.KryoSerializer; ...@@ -35,7 +35,6 @@ import org.onosproject.store.serializers.KryoSerializer;
35 import org.slf4j.Logger; 35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory; 36 import org.slf4j.LoggerFactory;
37 37
38 -import java.io.IOException;
39 import java.util.ArrayList; 38 import java.util.ArrayList;
40 import java.util.Collection; 39 import java.util.Collection;
41 import java.util.HashMap; 40 import java.util.HashMap;
...@@ -51,6 +50,7 @@ import java.util.concurrent.Executors; ...@@ -51,6 +50,7 @@ import java.util.concurrent.Executors;
51 import java.util.concurrent.ScheduledExecutorService; 50 import java.util.concurrent.ScheduledExecutorService;
52 import java.util.concurrent.TimeUnit; 51 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.atomic.AtomicLong; 52 import java.util.concurrent.atomic.AtomicLong;
53 +import java.util.function.BiFunction;
54 import java.util.stream.Collectors; 54 import java.util.stream.Collectors;
55 55
56 import static com.google.common.base.Preconditions.checkNotNull; 56 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -87,6 +87,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -87,6 +87,7 @@ public class EventuallyConsistentMapImpl<K, V>
87 private final ExecutorService executor; 87 private final ExecutorService executor;
88 88
89 private final ScheduledExecutorService backgroundExecutor; 89 private final ScheduledExecutorService backgroundExecutor;
90 + private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction;
90 91
91 private ExecutorService broadcastMessageExecutor; 92 private ExecutorService broadcastMessageExecutor;
92 93
...@@ -140,14 +141,18 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -140,14 +141,18 @@ public class EventuallyConsistentMapImpl<K, V>
140 * both K and V 141 * both K and V
141 * @param clockService a clock service able to generate timestamps 142 * @param clockService a clock service able to generate timestamps
142 * for K 143 * for K
144 + * @param peerUpdateFunction function that provides a set of nodes to immediately
145 + * update to when there writes to the map
143 */ 146 */
144 public EventuallyConsistentMapImpl(String mapName, 147 public EventuallyConsistentMapImpl(String mapName,
145 ClusterService clusterService, 148 ClusterService clusterService,
146 ClusterCommunicationService clusterCommunicator, 149 ClusterCommunicationService clusterCommunicator,
147 KryoNamespace.Builder serializerBuilder, 150 KryoNamespace.Builder serializerBuilder,
148 - ClockService<K, V> clockService) { 151 + ClockService<K, V> clockService,
152 + BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) {
149 this.clusterService = checkNotNull(clusterService); 153 this.clusterService = checkNotNull(clusterService);
150 this.clusterCommunicator = checkNotNull(clusterCommunicator); 154 this.clusterCommunicator = checkNotNull(clusterCommunicator);
155 + this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
151 156
152 serializer = createSerializer(checkNotNull(serializerBuilder)); 157 serializer = createSerializer(checkNotNull(serializerBuilder));
153 destroyedMessage = mapName + ERROR_DESTROYED; 158 destroyedMessage = mapName + ERROR_DESTROYED;
...@@ -189,6 +194,34 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -189,6 +194,34 @@ public class EventuallyConsistentMapImpl<K, V>
189 new InternalAntiEntropyListener(), backgroundExecutor); 194 new InternalAntiEntropyListener(), backgroundExecutor);
190 } 195 }
191 196
197 + /**
198 + * Creates a new eventually consistent map shared amongst multiple instances.
199 + * <p>
200 + * Take a look at the other constructor for usage information. The only difference
201 + * is that a BiFunction is provided that returns all nodes in the cluster, so
202 + * all nodes will be sent write updates immediately.
203 + * </p>
204 + *
205 + * @param mapName a String identifier for the map.
206 + * @param clusterService the cluster service
207 + * @param clusterCommunicator the cluster communications service
208 + * @param serializerBuilder a Kryo namespace builder that can serialize
209 + * both K and V
210 + * @param clockService a clock service able to generate timestamps
211 + * for K
212 + */
213 + public EventuallyConsistentMapImpl(String mapName,
214 + ClusterService clusterService,
215 + ClusterCommunicationService clusterCommunicator,
216 + KryoNamespace.Builder serializerBuilder,
217 + ClockService<K, V> clockService) {
218 + this(mapName, clusterService, clusterCommunicator, serializerBuilder, clockService,
219 + (key, value) -> clusterService.getNodes().stream()
220 + .map(ControllerNode::id)
221 + .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id()))
222 + .collect(Collectors.toList()));
223 + }
224 +
192 private KryoSerializer createSerializer(KryoNamespace.Builder builder) { 225 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
193 return new KryoSerializer() { 226 return new KryoSerializer() {
194 @Override 227 @Override
...@@ -270,11 +303,10 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -270,11 +303,10 @@ public class EventuallyConsistentMapImpl<K, V>
270 Timestamp timestamp = clockService.getTimestamp(key, value); 303 Timestamp timestamp = clockService.getTimestamp(key, value);
271 304
272 if (putInternal(key, value, timestamp)) { 305 if (putInternal(key, value, timestamp)) {
273 - notifyPeers(new InternalPutEvent<>(key, value, timestamp)); 306 + notifyPeers(new InternalPutEvent<>(key, value, timestamp),
274 - EventuallyConsistentMapEvent<K, V> externalEvent 307 + peerUpdateFunction.apply(key, value));
275 - = new EventuallyConsistentMapEvent<>( 308 + notifyListeners(new EventuallyConsistentMapEvent<>(
276 - EventuallyConsistentMapEvent.Type.PUT, key, value); 309 + EventuallyConsistentMapEvent.Type.PUT, key, value));
277 - notifyListeners(externalEvent);
278 } 310 }
279 } 311 }
280 312
...@@ -318,11 +350,10 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -318,11 +350,10 @@ public class EventuallyConsistentMapImpl<K, V>
318 Timestamp timestamp = clockService.getTimestamp(key, null); 350 Timestamp timestamp = clockService.getTimestamp(key, null);
319 351
320 if (removeInternal(key, timestamp)) { 352 if (removeInternal(key, timestamp)) {
321 - notifyPeers(new InternalRemoveEvent<>(key, timestamp)); 353 + notifyPeers(new InternalRemoveEvent<>(key, timestamp),
322 - EventuallyConsistentMapEvent<K, V> externalEvent 354 + peerUpdateFunction.apply(key, null));
323 - = new EventuallyConsistentMapEvent<>( 355 + notifyListeners(new EventuallyConsistentMapEvent<>(
324 - EventuallyConsistentMapEvent.Type.REMOVE, key, null); 356 + EventuallyConsistentMapEvent.Type.REMOVE, key, null));
325 - notifyListeners(externalEvent);
326 } 357 }
327 } 358 }
328 359
...@@ -364,11 +395,10 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -364,11 +395,10 @@ public class EventuallyConsistentMapImpl<K, V>
364 Timestamp timestamp = clockService.getTimestamp(key, value); 395 Timestamp timestamp = clockService.getTimestamp(key, value);
365 396
366 if (removeInternal(key, timestamp)) { 397 if (removeInternal(key, timestamp)) {
367 - notifyPeers(new InternalRemoveEvent<>(key, timestamp)); 398 + notifyPeers(new InternalRemoveEvent<>(key, timestamp),
368 - EventuallyConsistentMapEvent<K, V> externalEvent 399 + peerUpdateFunction.apply(key, value));
369 - = new EventuallyConsistentMapEvent<>( 400 + notifyListeners(new EventuallyConsistentMapEvent<>(
370 - EventuallyConsistentMapEvent.Type.REMOVE, key, value); 401 + EventuallyConsistentMapEvent.Type.REMOVE, key, value));
371 - notifyListeners(externalEvent);
372 } 402 }
373 } 403 }
374 404
...@@ -393,7 +423,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -393,7 +423,7 @@ public class EventuallyConsistentMapImpl<K, V>
393 } 423 }
394 424
395 if (!updates.isEmpty()) { 425 if (!updates.isEmpty()) {
396 - notifyPeers(new InternalPutEvent<>(updates)); 426 + broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
397 427
398 for (PutEntry<K, V> entry : updates) { 428 for (PutEntry<K, V> entry : updates) {
399 EventuallyConsistentMapEvent<K, V> externalEvent = 429 EventuallyConsistentMapEvent<K, V> externalEvent =
...@@ -421,7 +451,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -421,7 +451,7 @@ public class EventuallyConsistentMapImpl<K, V>
421 } 451 }
422 452
423 if (!removed.isEmpty()) { 453 if (!removed.isEmpty()) {
424 - notifyPeers(new InternalRemoveEvent<>(removed)); 454 + broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
425 455
426 for (RemoveEntry<K> entry : removed) { 456 for (RemoveEntry<K> entry : removed) {
427 EventuallyConsistentMapEvent<K, V> externalEvent 457 EventuallyConsistentMapEvent<K, V> externalEvent
...@@ -493,16 +523,26 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -493,16 +523,26 @@ public class EventuallyConsistentMapImpl<K, V>
493 } 523 }
494 } 524 }
495 525
496 - private void notifyPeers(InternalPutEvent event) { 526 + private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) {
497 // FIXME extremely memory expensive when we are overrun 527 // FIXME extremely memory expensive when we are overrun
498 // broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event)); 528 // broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
499 - broadcastMessage(updateMessageSubject, event); 529 + multicastMessage(updateMessageSubject, event, peers);
500 } 530 }
501 531
502 - private void notifyPeers(InternalRemoveEvent event) { 532 + private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) {
503 // FIXME extremely memory expensive when we are overrun 533 // FIXME extremely memory expensive when we are overrun
504 // broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event)); 534 // broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
505 - broadcastMessage(removeMessageSubject, event); 535 + multicastMessage(removeMessageSubject, event, peers);
536 + }
537 +
538 + private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) {
539 + // FIXME can we parallelize the serialization... use the caller???
540 + ClusterMessage message = new ClusterMessage(
541 + clusterService.getLocalNode().id(),
542 + subject,
543 + serializer.encode(event));
544 + broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
545 +// clusterCommunicator.broadcast(message);
506 } 546 }
507 547
508 private void broadcastMessage(MessageSubject subject, Object event) { 548 private void broadcastMessage(MessageSubject subject, Object event) {
...@@ -515,14 +555,13 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -515,14 +555,13 @@ public class EventuallyConsistentMapImpl<K, V>
515 // clusterCommunicator.broadcast(message); 555 // clusterCommunicator.broadcast(message);
516 } 556 }
517 557
518 - private void unicastMessage(NodeId peer, 558 + private void unicastMessage(NodeId peer, MessageSubject subject, Object event) {
519 - MessageSubject subject,
520 - Object event) throws IOException {
521 ClusterMessage message = new ClusterMessage( 559 ClusterMessage message = new ClusterMessage(
522 clusterService.getLocalNode().id(), 560 clusterService.getLocalNode().id(),
523 subject, 561 subject,
524 serializer.encode(event)); 562 serializer.encode(event));
525 - clusterCommunicator.unicast(message, peer); 563 +// clusterCommunicator.unicast(message, peer);
564 + broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
526 } 565 }
527 566
528 private boolean underHighLoad() { 567 private boolean underHighLoad() {
...@@ -567,11 +606,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -567,11 +606,9 @@ public class EventuallyConsistentMapImpl<K, V>
567 606
568 AntiEntropyAdvertisement<K> ad = createAdvertisement(); 607 AntiEntropyAdvertisement<K> ad = createAdvertisement();
569 608
570 - try { 609 + // TODO check the return value?
571 - unicastMessage(peer, antiEntropyAdvertisementSubject, ad); 610 + unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
572 - } catch (IOException e) { 611 + // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer);
573 - log.debug("Failed to send anti-entropy advertisement to {}", peer);
574 - }
575 } catch (Exception e) { 612 } catch (Exception e) {
576 // Catch all exceptions to avoid scheduled task being suppressed. 613 // Catch all exceptions to avoid scheduled task being suppressed.
577 log.error("Exception thrown while sending advertisement", e); 614 log.error("Exception thrown while sending advertisement", e);
...@@ -607,14 +644,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -607,14 +644,9 @@ public class EventuallyConsistentMapImpl<K, V>
607 // Send the advertisement back if this peer is out-of-sync 644 // Send the advertisement back if this peer is out-of-sync
608 final NodeId sender = ad.sender(); 645 final NodeId sender = ad.sender();
609 AntiEntropyAdvertisement<K> myAd = createAdvertisement(); 646 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
610 - try { 647 + // TODO check the return value?
611 - unicastMessage(sender, antiEntropyAdvertisementSubject, myAd); 648 + unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
612 - } catch (IOException e) { 649 + // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
613 - log.debug(
614 - "Failed to send reactive anti-entropy advertisement to {}",
615 - sender);
616 - }
617 -
618 break; 650 break;
619 } 651 }
620 } 652 }
...@@ -669,12 +701,10 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -669,12 +701,10 @@ public class EventuallyConsistentMapImpl<K, V>
669 701
670 // Send all updates to the peer at once 702 // Send all updates to the peer at once
671 if (!updatesToSend.isEmpty()) { 703 if (!updatesToSend.isEmpty()) {
672 - try { 704 + // TODO check the return value?
673 - unicastMessage(sender, updateMessageSubject, 705 + unicastMessage(sender, updateMessageSubject,
674 - new InternalPutEvent<>(updatesToSend)); 706 + new InternalPutEvent<>(updatesToSend));
675 - } catch (IOException e) { 707 + //error log: log.warn("Failed to send advertisement response", e);
676 - log.warn("Failed to send advertisement response", e);
677 - }
678 } 708 }
679 709
680 return externalEvents; 710 return externalEvents;
...@@ -707,12 +737,10 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -707,12 +737,10 @@ public class EventuallyConsistentMapImpl<K, V>
707 737
708 // Send all removes to the peer at once 738 // Send all removes to the peer at once
709 if (!removesToSend.isEmpty()) { 739 if (!removesToSend.isEmpty()) {
710 - try { 740 + // TODO check the return value
711 - unicastMessage(sender, removeMessageSubject, 741 + unicastMessage(sender, removeMessageSubject,
712 - new InternalRemoveEvent<>(removesToSend)); 742 + new InternalRemoveEvent<>(removesToSend));
713 - } catch (IOException e) { 743 + // error log: log.warn("Failed to send advertisement response", e);
714 - log.warn("Failed to send advertisement response", e);
715 - }
716 } 744 }
717 } 745 }
718 746
......
...@@ -29,6 +29,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -29,6 +29,7 @@ import org.apache.felix.scr.annotations.Deactivate;
29 import org.apache.felix.scr.annotations.Reference; 29 import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality; 30 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.apache.felix.scr.annotations.Service; 31 import org.apache.felix.scr.annotations.Service;
32 +import org.onlab.util.BoundedThreadPool;
32 import org.onlab.util.KryoNamespace; 33 import org.onlab.util.KryoNamespace;
33 import org.onlab.util.NewConcurrentHashMap; 34 import org.onlab.util.NewConcurrentHashMap;
34 import org.onosproject.cluster.ClusterService; 35 import org.onosproject.cluster.ClusterService;
...@@ -138,7 +139,8 @@ public class DistributedFlowRuleStore ...@@ -138,7 +139,8 @@ public class DistributedFlowRuleStore
138 private ExecutorService messageHandlingExecutor; 139 private ExecutorService messageHandlingExecutor;
139 140
140 private final ExecutorService backupExecutors = 141 private final ExecutorService backupExecutors =
141 - Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups")); 142 + BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
143 + //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
142 144
143 private boolean syncBackup = false; 145 private boolean syncBackup = false;
144 146
...@@ -385,12 +387,8 @@ public class DistributedFlowRuleStore ...@@ -385,12 +387,8 @@ public class DistributedFlowRuleStore
385 SERIALIZER.encode(operation)); 387 SERIALIZER.encode(operation));
386 388
387 389
388 - try { 390 + if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
389 - 391 + log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
390 - clusterCommunicator.unicast(message, replicaInfo.master().get());
391 -
392 - } catch (IOException e) {
393 - log.warn("Failed to storeBatch: {}", e.getMessage());
394 392
395 Set<FlowRule> allFailures = operation.getOperations().stream() 393 Set<FlowRule> allFailures = operation.getOperations().stream()
396 .map(op -> op.target()) 394 .map(op -> op.target())
...@@ -401,7 +399,6 @@ public class DistributedFlowRuleStore ...@@ -401,7 +399,6 @@ public class DistributedFlowRuleStore
401 new CompletedBatchOperation(false, allFailures, deviceId))); 399 new CompletedBatchOperation(false, allFailures, deviceId)));
402 return; 400 return;
403 } 401 }
404 -
405 } 402 }
406 403
407 private void storeBatchInternal(FlowRuleBatchOperation operation) { 404 private void storeBatchInternal(FlowRuleBatchOperation operation) {
...@@ -576,15 +573,13 @@ public class DistributedFlowRuleStore ...@@ -576,15 +573,13 @@ public class DistributedFlowRuleStore
576 if (nodeId == null) { 573 if (nodeId == null) {
577 notifyDelegate(event); 574 notifyDelegate(event);
578 } else { 575 } else {
579 - try { 576 + ClusterMessage message = new ClusterMessage(
580 - ClusterMessage message = new ClusterMessage( 577 + clusterService.getLocalNode().id(),
581 - clusterService.getLocalNode().id(), 578 + REMOTE_APPLY_COMPLETED,
582 - REMOTE_APPLY_COMPLETED, 579 + SERIALIZER.encode(event));
583 - SERIALIZER.encode(event)); 580 + // TODO check unicast return value
584 - clusterCommunicator.unicast(message, nodeId); 581 + clusterCommunicator.unicast(message, nodeId);
585 - } catch (IOException e) { 582 + //error log: log.warn("Failed to respond to peer for batch operation result");
586 - log.warn("Failed to respond to peer for batch operation result");
587 - }
588 } 583 }
589 } 584 }
590 585
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.intent.impl; 16 package org.onosproject.store.intent.impl;
17 17
18 +import com.google.common.collect.ImmutableList;
18 import org.apache.felix.scr.annotations.Activate; 19 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component; 20 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 21 import org.apache.felix.scr.annotations.Deactivate;
...@@ -23,6 +24,8 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; ...@@ -23,6 +24,8 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service; 24 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.util.KryoNamespace; 25 import org.onlab.util.KryoNamespace;
25 import org.onosproject.cluster.ClusterService; 26 import org.onosproject.cluster.ClusterService;
27 +import org.onosproject.cluster.ControllerNode;
28 +import org.onosproject.cluster.NodeId;
26 import org.onosproject.net.intent.Intent; 29 import org.onosproject.net.intent.Intent;
27 import org.onosproject.net.intent.IntentData; 30 import org.onosproject.net.intent.IntentData;
28 import org.onosproject.net.intent.IntentEvent; 31 import org.onosproject.net.intent.IntentEvent;
...@@ -41,7 +44,10 @@ import org.onosproject.store.impl.WallClockTimestamp; ...@@ -41,7 +44,10 @@ import org.onosproject.store.impl.WallClockTimestamp;
41 import org.onosproject.store.serializers.KryoNamespaces; 44 import org.onosproject.store.serializers.KryoNamespaces;
42 import org.slf4j.Logger; 45 import org.slf4j.Logger;
43 46
47 +import java.util.ArrayList;
48 +import java.util.Collections;
44 import java.util.List; 49 import java.util.List;
50 +import java.util.Objects;
45 import java.util.stream.Collectors; 51 import java.util.stream.Collectors;
46 52
47 import static org.onosproject.net.intent.IntentState.*; 53 import static org.onosproject.net.intent.IntentState.*;
...@@ -51,6 +57,8 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -51,6 +57,8 @@ import static org.slf4j.LoggerFactory.getLogger;
51 * Manages inventory of Intents in a distributed data store that uses optimistic 57 * Manages inventory of Intents in a distributed data store that uses optimistic
52 * replication and gossip based techniques. 58 * replication and gossip based techniques.
53 */ 59 */
60 +//FIXME we should listen for leadership changes. if the local instance has just
61 +// ... become a leader, scan the pending map and process those
54 @Component(immediate = false, enabled = true) 62 @Component(immediate = false, enabled = true)
55 @Service 63 @Service
56 public class GossipIntentStore 64 public class GossipIntentStore
...@@ -86,15 +94,17 @@ public class GossipIntentStore ...@@ -86,15 +94,17 @@ public class GossipIntentStore
86 clusterService, 94 clusterService,
87 clusterCommunicator, 95 clusterCommunicator,
88 intentSerializer, 96 intentSerializer,
89 - new IntentDataLogicalClockManager<>()); 97 + new IntentDataLogicalClockManager<>(),
98 + (key, intentData) -> getPeerNodes(key, intentData));
90 99
91 pendingMap = new EventuallyConsistentMapImpl<>("intent-pending", 100 pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
92 clusterService, 101 clusterService,
93 clusterCommunicator, 102 clusterCommunicator,
94 intentSerializer, // TODO 103 intentSerializer, // TODO
95 - new IntentDataClockManager<>()); 104 + new IntentDataClockManager<>(),
105 + (key, intentData) -> getPeerNodes(key, intentData));
96 106
97 - currentMap.addListener(new InternalIntentStatesListener()); 107 + currentMap.addListener(new InternalCurrentListener());
98 pendingMap.addListener(new InternalPendingListener()); 108 pendingMap.addListener(new InternalPendingListener());
99 109
100 log.info("Started"); 110 log.info("Started");
...@@ -226,7 +236,6 @@ public class GossipIntentStore ...@@ -226,7 +236,6 @@ public class GossipIntentStore
226 @Override 236 @Override
227 public void write(IntentData newData) { 237 public void write(IntentData newData) {
228 IntentData currentData = currentMap.get(newData.key()); 238 IntentData currentData = currentMap.get(newData.key());
229 -
230 if (isUpdateAcceptable(currentData, newData)) { 239 if (isUpdateAcceptable(currentData, newData)) {
231 // Only the master is modifying the current state. Therefore assume 240 // Only the master is modifying the current state. Therefore assume
232 // this always succeeds 241 // this always succeeds
...@@ -239,6 +248,34 @@ public class GossipIntentStore ...@@ -239,6 +248,34 @@ public class GossipIntentStore
239 } 248 }
240 } 249 }
241 250
251 + private Iterable<NodeId> getPeerNodes(Key key, IntentData data) {
252 + NodeId master = partitionService.getLeader(key);
253 + NodeId origin = (data != null) ? data.origin() : null;
254 + NodeId me = clusterService.getLocalNode().id();
255 + boolean isMaster = Objects.equals(master, me);
256 + boolean isOrigin = Objects.equals(origin, me);
257 + if (isMaster && isOrigin) {
258 + return ImmutableList.of(getRandomNode());
259 + } else if (isMaster) {
260 + return ImmutableList.of(origin);
261 + } else if (isOrigin) {
262 + return ImmutableList.of(master);
263 + } else {
264 + // FIXME: why are we here? log error?
265 + return ImmutableList.of(master);
266 + }
267 + }
268 +
269 + private NodeId getRandomNode() {
270 + List<NodeId> nodes = clusterService.getNodes().stream()
271 + .map(ControllerNode::id)
272 + .collect(Collectors.toCollection(ArrayList::new));
273 + Collections.shuffle(nodes);
274 + // FIXME check if self
275 + // FIXME verify nodes.size() > 0
276 + return nodes.get(0);
277 + }
278 +
242 @Override 279 @Override
243 public void batchWrite(Iterable<IntentData> updates) { 280 public void batchWrite(Iterable<IntentData> updates) {
244 updates.forEach(this::write); 281 updates.forEach(this::write);
...@@ -263,6 +300,7 @@ public class GossipIntentStore ...@@ -263,6 +300,7 @@ public class GossipIntentStore
263 if (data.version() == null) { 300 if (data.version() == null) {
264 data.setVersion(new WallClockTimestamp()); 301 data.setVersion(new WallClockTimestamp());
265 } 302 }
303 + data.setOrigin(clusterService.getLocalNode().id());
266 pendingMap.put(data.key(), copyData(data)); 304 pendingMap.put(data.key(), copyData(data));
267 } 305 }
268 306
...@@ -292,7 +330,7 @@ public class GossipIntentStore ...@@ -292,7 +330,7 @@ public class GossipIntentStore
292 } 330 }
293 } 331 }
294 332
295 - private final class InternalIntentStatesListener implements 333 + private final class InternalCurrentListener implements
296 EventuallyConsistentMapListener<Key, IntentData> { 334 EventuallyConsistentMapListener<Key, IntentData> {
297 @Override 335 @Override
298 public void event( 336 public void event(
......
...@@ -29,13 +29,13 @@ import org.onosproject.cluster.Leadership; ...@@ -29,13 +29,13 @@ import org.onosproject.cluster.Leadership;
29 import org.onosproject.cluster.LeadershipEvent; 29 import org.onosproject.cluster.LeadershipEvent;
30 import org.onosproject.cluster.LeadershipEventListener; 30 import org.onosproject.cluster.LeadershipEventListener;
31 import org.onosproject.cluster.LeadershipService; 31 import org.onosproject.cluster.LeadershipService;
32 +import org.onosproject.cluster.NodeId;
32 import org.onosproject.net.intent.Key; 33 import org.onosproject.net.intent.Key;
33 import org.slf4j.Logger; 34 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory; 35 import org.slf4j.LoggerFactory;
35 36
36 -import com.google.common.base.Objects;
37 -
38 import java.util.List; 37 import java.util.List;
38 +import java.util.Objects;
39 import java.util.concurrent.Executors; 39 import java.util.concurrent.Executors;
40 import java.util.concurrent.ScheduledExecutorService; 40 import java.util.concurrent.ScheduledExecutorService;
41 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.TimeUnit;
...@@ -109,8 +109,13 @@ public class PartitionManager implements PartitionService { ...@@ -109,8 +109,13 @@ public class PartitionManager implements PartitionService {
109 109
110 @Override 110 @Override
111 public boolean isMine(Key intentKey) { 111 public boolean isMine(Key intentKey) {
112 - return Objects.equal(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))), 112 + return Objects.equals(leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))),
113 - clusterService.getLocalNode().id()); 113 + clusterService.getLocalNode().id());
114 + }
115 +
116 + @Override
117 + public NodeId getLeader(Key intentKey) {
118 + return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)));
114 } 119 }
115 120
116 private void doRelinquish() { 121 private void doRelinquish() {
...@@ -171,7 +176,7 @@ public class PartitionManager implements PartitionService { ...@@ -171,7 +176,7 @@ public class PartitionManager implements PartitionService {
171 public void event(LeadershipEvent event) { 176 public void event(LeadershipEvent event) {
172 Leadership leadership = event.subject(); 177 Leadership leadership = event.subject();
173 178
174 - if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) && 179 + if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
175 leadership.topic().startsWith(ELECTION_PREFIX)) { 180 leadership.topic().startsWith(ELECTION_PREFIX)) {
176 181
177 // See if we need to let some partitions go 182 // See if we need to let some partitions go
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.intent.impl; 16 package org.onosproject.store.intent.impl;
17 17
18 +import org.onosproject.cluster.NodeId;
18 import org.onosproject.net.intent.Key; 19 import org.onosproject.net.intent.Key;
19 20
20 /** 21 /**
...@@ -31,5 +32,13 @@ public interface PartitionService { ...@@ -31,5 +32,13 @@ public interface PartitionService {
31 */ 32 */
32 boolean isMine(Key intentKey); 33 boolean isMine(Key intentKey);
33 34
35 + /**
36 + * Returns the leader for a particular key.
37 + *
38 + * @param intentKey intent key to query
39 + * @return the leader node
40 + */
41 + NodeId getLeader(Key intentKey);
42 +
34 // TODO add API for rebalancing partitions 43 // TODO add API for rebalancing partitions
35 } 44 }
......
...@@ -337,13 +337,13 @@ public class GossipLinkStore ...@@ -337,13 +337,13 @@ public class GossipLinkStore
337 ClusterMessage linkInjectedMessage = new ClusterMessage(localNode, 337 ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
338 GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent)); 338 GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
339 339
340 - try { 340 + // TODO check unicast return value
341 - clusterCommunicator.unicast(linkInjectedMessage, dstNode); 341 + clusterCommunicator.unicast(linkInjectedMessage, dstNode);
342 - } catch (IOException e) { 342 + /* error log:
343 - log.warn("Failed to process link update between src: {} and dst: {} " + 343 + log.warn("Failed to process link update between src: {} and dst: {} " +
344 - "(cluster messaging failed: {})", 344 + "(cluster messaging failed: {})",
345 - linkDescription.src(), linkDescription.dst(), e); 345 + linkDescription.src(), linkDescription.dst(), e);
346 - } 346 + */
347 347
348 } 348 }
349 349
......
...@@ -15,19 +15,13 @@ ...@@ -15,19 +15,13 @@
15 */ 15 */
16 package org.onosproject.store.packet.impl; 16 package org.onosproject.store.packet.impl;
17 17
18 -import static org.onlab.util.Tools.groupedThreads;
19 -import static org.slf4j.LoggerFactory.getLogger;
20 -
21 -import java.io.IOException;
22 -import java.util.concurrent.ExecutorService;
23 -import java.util.concurrent.Executors;
24 -
25 import org.apache.felix.scr.annotations.Activate; 18 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component; 19 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
24 +import org.onlab.util.KryoNamespace;
31 import org.onosproject.cluster.ClusterService; 25 import org.onosproject.cluster.ClusterService;
32 import org.onosproject.cluster.NodeId; 26 import org.onosproject.cluster.NodeId;
33 import org.onosproject.mastership.MastershipService; 27 import org.onosproject.mastership.MastershipService;
...@@ -43,9 +37,14 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler; ...@@ -43,9 +37,14 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
43 import org.onosproject.store.cluster.messaging.MessageSubject; 37 import org.onosproject.store.cluster.messaging.MessageSubject;
44 import org.onosproject.store.serializers.KryoNamespaces; 38 import org.onosproject.store.serializers.KryoNamespaces;
45 import org.onosproject.store.serializers.KryoSerializer; 39 import org.onosproject.store.serializers.KryoSerializer;
46 -import org.onlab.util.KryoNamespace;
47 import org.slf4j.Logger; 40 import org.slf4j.Logger;
48 41
42 +import java.util.concurrent.ExecutorService;
43 +import java.util.concurrent.Executors;
44 +
45 +import static org.onlab.util.Tools.groupedThreads;
46 +import static org.slf4j.LoggerFactory.getLogger;
47 +
49 /** 48 /**
50 * Distributed packet store implementation allowing packets to be sent to 49 * Distributed packet store implementation allowing packets to be sent to
51 * remote instances. 50 * remote instances.
...@@ -118,12 +117,10 @@ public class DistributedPacketStore ...@@ -118,12 +117,10 @@ public class DistributedPacketStore
118 return; 117 return;
119 } 118 }
120 119
121 - try { 120 + // TODO check unicast return value
122 - communicationService.unicast(new ClusterMessage( 121 + communicationService.unicast(new ClusterMessage(
123 - myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master); 122 + myId, PACKET_OUT_SUBJECT, SERIALIZER.encode(packet)), master);
124 - } catch (IOException e) { 123 + // error log: log.warn("Failed to send packet-out to {}", master);
125 - log.warn("Failed to send packet-out to {}", master);
126 - }
127 } 124 }
128 125
129 /** 126 /**
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 import com.google.common.collect.ComparisonChain; 18 import com.google.common.collect.ComparisonChain;
19 +import com.google.common.collect.ImmutableSet;
19 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
20 import com.google.common.util.concurrent.MoreExecutors; 21 import com.google.common.util.concurrent.MoreExecutors;
21 -
22 import org.junit.After; 22 import org.junit.After;
23 import org.junit.Before; 23 import org.junit.Before;
24 import org.junit.Test; 24 import org.junit.Test;
...@@ -53,10 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; ...@@ -53,10 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
53 import static com.google.common.base.Preconditions.checkArgument; 53 import static com.google.common.base.Preconditions.checkArgument;
54 import static junit.framework.TestCase.assertFalse; 54 import static junit.framework.TestCase.assertFalse;
55 import static org.easymock.EasyMock.*; 55 import static org.easymock.EasyMock.*;
56 -import static org.junit.Assert.assertEquals; 56 +import static org.junit.Assert.*;
57 -import static org.junit.Assert.assertNull;
58 -import static org.junit.Assert.assertTrue;
59 -import static org.junit.Assert.fail;
60 57
61 /** 58 /**
62 * Unit tests for EventuallyConsistentMapImpl. 59 * Unit tests for EventuallyConsistentMapImpl.
...@@ -119,8 +116,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -119,8 +116,8 @@ public class EventuallyConsistentMapImplTest {
119 @Before 116 @Before
120 public void setUp() throws Exception { 117 public void setUp() throws Exception {
121 clusterService = createMock(ClusterService.class); 118 clusterService = createMock(ClusterService.class);
122 - expect(clusterService.getLocalNode()).andReturn(self) 119 + expect(clusterService.getLocalNode()).andReturn(self).anyTimes();
123 - .anyTimes(); 120 + expect(clusterService.getNodes()).andReturn(ImmutableSet.of(self)).anyTimes();
124 replay(clusterService); 121 replay(clusterService);
125 122
126 clusterCommunicator = createMock(ClusterCommunicationService.class); 123 clusterCommunicator = createMock(ClusterCommunicationService.class);
...@@ -163,7 +160,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -163,7 +160,7 @@ public class EventuallyConsistentMapImplTest {
163 160
164 @Test 161 @Test
165 public void testSize() throws Exception { 162 public void testSize() throws Exception {
166 - expectAnyMessage(clusterCommunicator); 163 + expectPeerMessage(clusterCommunicator);
167 164
168 assertEquals(0, ecMap.size()); 165 assertEquals(0, ecMap.size());
169 ecMap.put(KEY1, VALUE1); 166 ecMap.put(KEY1, VALUE1);
...@@ -184,7 +181,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -184,7 +181,7 @@ public class EventuallyConsistentMapImplTest {
184 181
185 @Test 182 @Test
186 public void testIsEmpty() throws Exception { 183 public void testIsEmpty() throws Exception {
187 - expectAnyMessage(clusterCommunicator); 184 + expectPeerMessage(clusterCommunicator);
188 185
189 assertTrue(ecMap.isEmpty()); 186 assertTrue(ecMap.isEmpty());
190 ecMap.put(KEY1, VALUE1); 187 ecMap.put(KEY1, VALUE1);
...@@ -195,7 +192,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -195,7 +192,7 @@ public class EventuallyConsistentMapImplTest {
195 192
196 @Test 193 @Test
197 public void testContainsKey() throws Exception { 194 public void testContainsKey() throws Exception {
198 - expectAnyMessage(clusterCommunicator); 195 + expectPeerMessage(clusterCommunicator);
199 196
200 assertFalse(ecMap.containsKey(KEY1)); 197 assertFalse(ecMap.containsKey(KEY1));
201 ecMap.put(KEY1, VALUE1); 198 ecMap.put(KEY1, VALUE1);
...@@ -207,7 +204,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -207,7 +204,7 @@ public class EventuallyConsistentMapImplTest {
207 204
208 @Test 205 @Test
209 public void testContainsValue() throws Exception { 206 public void testContainsValue() throws Exception {
210 - expectAnyMessage(clusterCommunicator); 207 + expectPeerMessage(clusterCommunicator);
211 208
212 assertFalse(ecMap.containsValue(VALUE1)); 209 assertFalse(ecMap.containsValue(VALUE1));
213 ecMap.put(KEY1, VALUE1); 210 ecMap.put(KEY1, VALUE1);
...@@ -222,7 +219,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -222,7 +219,7 @@ public class EventuallyConsistentMapImplTest {
222 219
223 @Test 220 @Test
224 public void testGet() throws Exception { 221 public void testGet() throws Exception {
225 - expectAnyMessage(clusterCommunicator); 222 + expectPeerMessage(clusterCommunicator);
226 223
227 CountDownLatch latch; 224 CountDownLatch latch;
228 225
...@@ -278,7 +275,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -278,7 +275,7 @@ public class EventuallyConsistentMapImplTest {
278 ecMap.addListener(listener); 275 ecMap.addListener(listener);
279 276
280 // Set up expected internal message to be broadcast to peers on first put 277 // Set up expected internal message to be broadcast to peers on first put
281 - expectSpecificMessage(generatePutMessage(KEY1, VALUE1, clockService 278 + expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
282 .peekAtNextTimestamp()), clusterCommunicator); 279 .peekAtNextTimestamp()), clusterCommunicator);
283 280
284 // Put first value 281 // Put first value
...@@ -289,7 +286,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -289,7 +286,7 @@ public class EventuallyConsistentMapImplTest {
289 verify(clusterCommunicator); 286 verify(clusterCommunicator);
290 287
291 // Set up expected internal message to be broadcast to peers on second put 288 // Set up expected internal message to be broadcast to peers on second put
292 - expectSpecificMessage(generatePutMessage( 289 + expectSpecificMulticastMessage(generatePutMessage(
293 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator); 290 KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator);
294 291
295 // Update same key to a new value 292 // Update same key to a new value
...@@ -332,14 +329,14 @@ public class EventuallyConsistentMapImplTest { ...@@ -332,14 +329,14 @@ public class EventuallyConsistentMapImplTest {
332 ecMap.addListener(listener); 329 ecMap.addListener(listener);
333 330
334 // Put in an initial value 331 // Put in an initial value
335 - expectAnyMessage(clusterCommunicator); 332 + expectPeerMessage(clusterCommunicator);
336 ecMap.put(KEY1, VALUE1); 333 ecMap.put(KEY1, VALUE1);
337 assertEquals(VALUE1, ecMap.get(KEY1)); 334 assertEquals(VALUE1, ecMap.get(KEY1));
338 335
339 // Remove the value and check the correct internal cluster messages 336 // Remove the value and check the correct internal cluster messages
340 // are sent 337 // are sent
341 - expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), 338 + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
342 - clusterCommunicator); 339 + clusterCommunicator);
343 340
344 ecMap.remove(KEY1); 341 ecMap.remove(KEY1);
345 assertNull(ecMap.get(KEY1)); 342 assertNull(ecMap.get(KEY1));
...@@ -349,8 +346,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -349,8 +346,8 @@ public class EventuallyConsistentMapImplTest {
349 // Remove the same value again. Even though the value is no longer in 346 // Remove the same value again. Even though the value is no longer in
350 // the map, we expect that the tombstone is updated and another remove 347 // the map, we expect that the tombstone is updated and another remove
351 // event is sent to the cluster and external listeners. 348 // event is sent to the cluster and external listeners.
352 - expectSpecificMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), 349 + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
353 - clusterCommunicator); 350 + clusterCommunicator);
354 351
355 ecMap.remove(KEY1); 352 ecMap.remove(KEY1);
356 assertNull(ecMap.get(KEY1)); 353 assertNull(ecMap.get(KEY1));
...@@ -359,7 +356,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -359,7 +356,7 @@ public class EventuallyConsistentMapImplTest {
359 356
360 357
361 // Put in a new value for us to try and remove 358 // Put in a new value for us to try and remove
362 - expectAnyMessage(clusterCommunicator); 359 + expectPeerMessage(clusterCommunicator);
363 360
364 ecMap.put(KEY2, VALUE2); 361 ecMap.put(KEY2, VALUE2);
365 362
...@@ -400,8 +397,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -400,8 +397,8 @@ public class EventuallyConsistentMapImplTest {
400 ecMap.addListener(listener); 397 ecMap.addListener(listener);
401 398
402 // Expect a multi-update inter-instance message 399 // Expect a multi-update inter-instance message
403 - expectSpecificMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), 400 + expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2),
404 - clusterCommunicator); 401 + clusterCommunicator);
405 402
406 Map<String, String> putAllValues = new HashMap<>(); 403 Map<String, String> putAllValues = new HashMap<>();
407 putAllValues.put(KEY1, VALUE1); 404 putAllValues.put(KEY1, VALUE1);
...@@ -434,12 +431,12 @@ public class EventuallyConsistentMapImplTest { ...@@ -434,12 +431,12 @@ public class EventuallyConsistentMapImplTest {
434 verify(clusterCommunicator); 431 verify(clusterCommunicator);
435 432
436 // Put some items in the map 433 // Put some items in the map
437 - expectAnyMessage(clusterCommunicator); 434 + expectPeerMessage(clusterCommunicator);
438 ecMap.put(KEY1, VALUE1); 435 ecMap.put(KEY1, VALUE1);
439 ecMap.put(KEY2, VALUE2); 436 ecMap.put(KEY2, VALUE2);
440 437
441 ecMap.addListener(listener); 438 ecMap.addListener(listener);
442 - expectSpecificMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator); 439 + expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator);
443 440
444 ecMap.clear(); 441 ecMap.clear();
445 442
...@@ -449,7 +446,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -449,7 +446,7 @@ public class EventuallyConsistentMapImplTest {
449 446
450 @Test 447 @Test
451 public void testKeySet() throws Exception { 448 public void testKeySet() throws Exception {
452 - expectAnyMessage(clusterCommunicator); 449 + expectPeerMessage(clusterCommunicator);
453 450
454 assertTrue(ecMap.keySet().isEmpty()); 451 assertTrue(ecMap.keySet().isEmpty());
455 452
...@@ -482,7 +479,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -482,7 +479,7 @@ public class EventuallyConsistentMapImplTest {
482 479
483 @Test 480 @Test
484 public void testValues() throws Exception { 481 public void testValues() throws Exception {
485 - expectAnyMessage(clusterCommunicator); 482 + expectPeerMessage(clusterCommunicator);
486 483
487 assertTrue(ecMap.values().isEmpty()); 484 assertTrue(ecMap.values().isEmpty());
488 485
...@@ -520,7 +517,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -520,7 +517,7 @@ public class EventuallyConsistentMapImplTest {
520 517
521 @Test 518 @Test
522 public void testEntrySet() throws Exception { 519 public void testEntrySet() throws Exception {
523 - expectAnyMessage(clusterCommunicator); 520 + expectPeerMessage(clusterCommunicator);
524 521
525 assertTrue(ecMap.entrySet().isEmpty()); 522 assertTrue(ecMap.entrySet().isEmpty());
526 523
...@@ -658,21 +655,52 @@ public class EventuallyConsistentMapImplTest { ...@@ -658,21 +655,52 @@ public class EventuallyConsistentMapImplTest {
658 * @param m message we expect to be sent 655 * @param m message we expect to be sent
659 * @param clusterCommunicator a mock ClusterCommunicationService to set up 656 * @param clusterCommunicator a mock ClusterCommunicationService to set up
660 */ 657 */
661 - private static void expectSpecificMessage(ClusterMessage m, 658 + private static void expectSpecificBroadcastMessage(ClusterMessage m,
662 - ClusterCommunicationService clusterCommunicator) { 659 + ClusterCommunicationService clusterCommunicator) {
663 reset(clusterCommunicator); 660 reset(clusterCommunicator);
664 expect(clusterCommunicator.broadcast(m)).andReturn(true); 661 expect(clusterCommunicator.broadcast(m)).andReturn(true);
665 replay(clusterCommunicator); 662 replay(clusterCommunicator);
666 } 663 }
667 664
668 /** 665 /**
669 - * Sets up a mock ClusterCommunicationService to expect any cluster message 666 + * Sets up a mock ClusterCommunicationService to expect a specific cluster
667 + * message to be multicast to the cluster.
668 + *
669 + * @param m message we expect to be sent
670 + * @param clusterCommunicator a mock ClusterCommunicationService to set up
671 + */
672 + private static void expectSpecificMulticastMessage(ClusterMessage m,
673 + ClusterCommunicationService clusterCommunicator) {
674 + reset(clusterCommunicator);
675 + expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
676 + replay(clusterCommunicator);
677 + }
678 +
679 +
680 + /**
681 + * Sets up a mock ClusterCommunicationService to expect a multicast cluster message
682 + * that is sent to it. This is useful for unit tests where we aren't
683 + * interested in testing the messaging component.
684 + *
685 + * @param clusterCommunicator a mock ClusterCommunicationService to set up
686 + */
687 + private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
688 + reset(clusterCommunicator);
689 + expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
690 + anyObject(Iterable.class)))
691 + .andReturn(true)
692 + .anyTimes();
693 + replay(clusterCommunicator);
694 + }
695 +
696 + /**
697 + * Sets up a mock ClusterCommunicationService to expect a broadcast cluster message
670 * that is sent to it. This is useful for unit tests where we aren't 698 * that is sent to it. This is useful for unit tests where we aren't
671 * interested in testing the messaging component. 699 * interested in testing the messaging component.
672 * 700 *
673 * @param clusterCommunicator a mock ClusterCommunicationService to set up 701 * @param clusterCommunicator a mock ClusterCommunicationService to set up
674 */ 702 */
675 - private void expectAnyMessage(ClusterCommunicationService clusterCommunicator) { 703 + private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
676 reset(clusterCommunicator); 704 reset(clusterCommunicator);
677 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class))) 705 expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
678 .andReturn(true) 706 .andReturn(true)
...@@ -700,13 +728,12 @@ public class EventuallyConsistentMapImplTest { ...@@ -700,13 +728,12 @@ public class EventuallyConsistentMapImplTest {
700 } 728 }
701 729
702 @Override 730 @Override
703 - public boolean unicast(ClusterMessage message, NodeId toNodeId) 731 + public boolean unicast(ClusterMessage message, NodeId toNodeId) {
704 - throws IOException {
705 return false; 732 return false;
706 } 733 }
707 734
708 @Override 735 @Override
709 - public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) { 736 + public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) {
710 return false; 737 return false;
711 } 738 }
712 739
......
...@@ -31,6 +31,8 @@ import static org.junit.Assert.*; ...@@ -31,6 +31,8 @@ import static org.junit.Assert.*;
31 */ 31 */
32 public class BlockingBooleanTest { 32 public class BlockingBooleanTest {
33 33
34 + private static final int TIMEOUT = 100; //ms
35 +
34 @Test 36 @Test
35 public void basics() { 37 public void basics() {
36 BlockingBoolean b = new BlockingBoolean(false); 38 BlockingBoolean b = new BlockingBoolean(false);
...@@ -60,7 +62,7 @@ public class BlockingBooleanTest { ...@@ -60,7 +62,7 @@ public class BlockingBooleanTest {
60 } 62 }
61 b.set(value); 63 b.set(value);
62 try { 64 try {
63 - assertTrue(latch.await(10, TimeUnit.MILLISECONDS)); 65 + assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
64 } catch (InterruptedException e) { 66 } catch (InterruptedException e) {
65 fail(); 67 fail();
66 } 68 }
...@@ -92,7 +94,7 @@ public class BlockingBooleanTest { ...@@ -92,7 +94,7 @@ public class BlockingBooleanTest {
92 } 94 }
93 }); 95 });
94 try { 96 try {
95 - assertTrue(latch.await(10, TimeUnit.MILLISECONDS)); 97 + assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
96 } catch (InterruptedException e) { 98 } catch (InterruptedException e) {
97 fail(); 99 fail();
98 } 100 }
...@@ -124,14 +126,14 @@ public class BlockingBooleanTest { ...@@ -124,14 +126,14 @@ public class BlockingBooleanTest {
124 }); 126 });
125 } 127 }
126 try { 128 try {
127 - assertTrue(sameLatch.await(10, TimeUnit.MILLISECONDS)); 129 + assertTrue(sameLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
128 assertEquals(waitLatch.getCount(), numThreads / 2); 130 assertEquals(waitLatch.getCount(), numThreads / 2);
129 } catch (InterruptedException e) { 131 } catch (InterruptedException e) {
130 fail(); 132 fail();
131 } 133 }
132 b.set(true); 134 b.set(true);
133 try { 135 try {
134 - assertTrue(waitLatch.await(10, TimeUnit.MILLISECONDS)); 136 + assertTrue(waitLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
135 } catch (InterruptedException e) { 137 } catch (InterruptedException e) {
136 fail(); 138 fail();
137 } 139 }
...@@ -156,7 +158,7 @@ public class BlockingBooleanTest { ...@@ -156,7 +158,7 @@ public class BlockingBooleanTest {
156 } 158 }
157 }); 159 });
158 try { 160 try {
159 - assertTrue(latch.await(10, TimeUnit.MILLISECONDS)); 161 + assertTrue(latch.await(TIMEOUT, TimeUnit.MILLISECONDS));
160 } catch (InterruptedException e) { 162 } catch (InterruptedException e) {
161 fail(); 163 fail();
162 } 164 }
......