HIGUCHI Yuta
Committed by Gerrit Code Review

Concurrently update EventuallyConsistentMap

- Removed synchronized block on Map updates
  which may result in anti-entropy AD sent to the peer containing out-of-sync update/remove,
  such as update and remove for the same key, but stale information will be ignored
  on the remote peer by timestamp if timestamps are properly generated.

Change-Id: Id4f993eb44b7858d37486be0d4baaff1f9025efa
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 import org.apache.commons.lang3.RandomUtils; 18 import org.apache.commons.lang3.RandomUtils;
19 +import org.apache.commons.lang3.mutable.MutableBoolean;
19 import org.apache.commons.lang3.tuple.Pair; 20 import org.apache.commons.lang3.tuple.Pair;
20 import org.onlab.util.KryoNamespace; 21 import org.onlab.util.KryoNamespace;
21 import org.onosproject.cluster.ClusterService; 22 import org.onosproject.cluster.ClusterService;
...@@ -42,6 +43,7 @@ import java.util.List; ...@@ -42,6 +43,7 @@ import java.util.List;
42 import java.util.Map; 43 import java.util.Map;
43 import java.util.Set; 44 import java.util.Set;
44 import java.util.concurrent.ConcurrentHashMap; 45 import java.util.concurrent.ConcurrentHashMap;
46 +import java.util.concurrent.ConcurrentMap;
45 import java.util.concurrent.CopyOnWriteArraySet; 47 import java.util.concurrent.CopyOnWriteArraySet;
46 import java.util.concurrent.ExecutorService; 48 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors; 49 import java.util.concurrent.Executors;
...@@ -64,8 +66,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -64,8 +66,8 @@ public class EventuallyConsistentMapImpl<K, V>
64 66
65 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class); 67 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
66 68
67 - private final Map<K, Timestamped<V>> items; 69 + private final ConcurrentMap<K, Timestamped<V>> items;
68 - private final Map<K, Timestamp> removedItems; 70 + private final ConcurrentMap<K, Timestamp> removedItems;
69 71
70 private final ClusterService clusterService; 72 private final ClusterService clusterService;
71 private final ClusterCommunicationService clusterCommunicator; 73 private final ClusterCommunicationService clusterCommunicator;
...@@ -267,16 +269,21 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -267,16 +269,21 @@ public class EventuallyConsistentMapImpl<K, V>
267 return false; 269 return false;
268 } 270 }
269 271
270 - boolean success; 272 + final MutableBoolean updated = new MutableBoolean(false);
271 - synchronized (this) { 273 +
272 - Timestamped<V> existing = items.get(key); 274 + items.compute(key, (k, existing) -> {
273 if (existing != null && existing.isNewerThan(timestamp)) { 275 if (existing != null && existing.isNewerThan(timestamp)) {
274 - log.debug("ecmap - existing was newer {}", value); 276 + updated.setFalse();
275 - success = false; 277 + return existing;
276 } else { 278 } else {
277 - items.put(key, new Timestamped<>(value, timestamp)); 279 + updated.setTrue();
278 - success = true; 280 + return new Timestamped<>(value, timestamp);
279 } 281 }
282 + });
283 +
284 + boolean success = updated.booleanValue();
285 + if (!success) {
286 + log.debug("ecmap - existing was newer {}", value);
280 } 287 }
281 288
282 if (success && removed != null) { 289 if (success && removed != null) {
...@@ -303,13 +310,21 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -303,13 +310,21 @@ public class EventuallyConsistentMapImpl<K, V>
303 } 310 }
304 311
305 private boolean removeInternal(K key, Timestamp timestamp) { 312 private boolean removeInternal(K key, Timestamp timestamp) {
306 - Timestamped<V> value = items.get(key); 313 + final MutableBoolean updated = new MutableBoolean(false);
307 - if (value != null) { 314 +
308 - if (value.isNewerThan(timestamp)) { 315 + items.compute(key, (k, existing) -> {
309 - return false; 316 + if (existing != null && existing.isNewerThan(timestamp)) {
317 + updated.setFalse();
318 + return existing;
310 } else { 319 } else {
311 - items.remove(key, value); 320 + updated.setTrue();
321 + // remove from items map
322 + return null;
312 } 323 }
324 + });
325 +
326 + if (updated.isFalse()) {
327 + return false;
313 } 328 }
314 329
315 Timestamp removedTimestamp = removedItems.get(key); 330 Timestamp removedTimestamp = removedItems.get(key);
...@@ -554,7 +569,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -554,7 +569,6 @@ public class EventuallyConsistentMapImpl<K, V>
554 List<EventuallyConsistentMapEvent<K, V>> externalEvents; 569 List<EventuallyConsistentMapEvent<K, V>> externalEvents;
555 boolean sync = false; 570 boolean sync = false;
556 571
557 - synchronized (this) {
558 externalEvents = antiEntropyCheckLocalItems(ad); 572 externalEvents = antiEntropyCheckLocalItems(ad);
559 573
560 antiEntropyCheckLocalRemoved(ad); 574 antiEntropyCheckLocalRemoved(ad);
...@@ -568,9 +582,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -568,9 +582,8 @@ public class EventuallyConsistentMapImpl<K, V>
568 break; 582 break;
569 } 583 }
570 } 584 }
571 - } // synchronized (this)
572 585
573 - // Send the advertisement outside the synchronized block 586 + // Send the advertisement back if this peer is out-of-sync
574 if (sync) { 587 if (sync) {
575 final NodeId sender = ad.sender(); 588 final NodeId sender = ad.sender();
576 AntiEntropyAdvertisement<K> myAd = createAdvertisement(); 589 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
...@@ -596,7 +609,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -596,7 +609,6 @@ public class EventuallyConsistentMapImpl<K, V>
596 * @param ad remote anti-entropy advertisement 609 * @param ad remote anti-entropy advertisement
597 * @return list of external events relating to local operations performed 610 * @return list of external events relating to local operations performed
598 */ 611 */
599 - // Guarded by synchronized (this)
600 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems( 612 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
601 AntiEntropyAdvertisement<K> ad) { 613 AntiEntropyAdvertisement<K> ad) {
602 final List<EventuallyConsistentMapEvent<K, V>> externalEvents 614 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
...@@ -652,7 +664,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -652,7 +664,6 @@ public class EventuallyConsistentMapImpl<K, V>
652 * 664 *
653 * @param ad remote anti-entropy advertisement 665 * @param ad remote anti-entropy advertisement
654 */ 666 */
655 - // Guarded by synchronized (this)
656 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) { 667 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
657 final NodeId sender = ad.sender(); 668 final NodeId sender = ad.sender();
658 669
...@@ -690,7 +701,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -690,7 +701,6 @@ public class EventuallyConsistentMapImpl<K, V>
690 * @param ad remote anti-entropy advertisement 701 * @param ad remote anti-entropy advertisement
691 * @return list of external events relating to local operations performed 702 * @return list of external events relating to local operations performed
692 */ 703 */
693 - // Guarded by synchronized (this)
694 private List<EventuallyConsistentMapEvent<K, V>> 704 private List<EventuallyConsistentMapEvent<K, V>>
695 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) { 705 antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
696 final List<EventuallyConsistentMapEvent<K, V>> externalEvents 706 final List<EventuallyConsistentMapEvent<K, V>> externalEvents
......