Madan Jampani

Simplified ECMap implmentation by merging items and tombstones maps

Change-Id: If4253722d91c35a7e57dec3c2fceb216d14a7314
1 -/*
2 - * Copyright 2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -package org.onosproject.store.ecmap;
17 -
18 -import java.util.Objects;
19 -
20 -import org.onosproject.store.Timestamp;
21 -
22 -import static com.google.common.base.Preconditions.checkNotNull;
23 -
24 -/**
25 - * Base class for events in an EventuallyConsistentMap.
26 - */
27 -public abstract class AbstractEntry<K, V> implements Comparable<AbstractEntry<K, V>> {
28 - private final K key;
29 - private final Timestamp timestamp;
30 -
31 - /**
32 - * Creates a new put entry.
33 - *
34 - * @param key key of the entry
35 - * @param timestamp timestamp of the put event
36 - */
37 - public AbstractEntry(K key, Timestamp timestamp) {
38 - this.key = checkNotNull(key);
39 - this.timestamp = checkNotNull(timestamp);
40 - }
41 -
42 - // Needed for serialization.
43 - @SuppressWarnings("unused")
44 - protected AbstractEntry() {
45 - this.key = null;
46 - this.timestamp = null;
47 - }
48 -
49 - /**
50 - * Returns the key of the entry.
51 - *
52 - * @return the key
53 - */
54 - public K key() {
55 - return key;
56 - }
57 -
58 - /**
59 - * Returns the timestamp of the event.
60 - *
61 - * @return the timestamp
62 - */
63 - public Timestamp timestamp() {
64 - return timestamp;
65 - }
66 -
67 - @Override
68 - public int compareTo(AbstractEntry<K, V> o) {
69 - return this.timestamp.compareTo(o.timestamp);
70 - }
71 -
72 - @Override
73 - public int hashCode() {
74 - return Objects.hash(timestamp);
75 - }
76 -
77 - @Override
78 - public boolean equals(Object o) {
79 - if (this == o) {
80 - return true;
81 - }
82 - if (o instanceof AbstractEntry) {
83 - final AbstractEntry that = (AbstractEntry) o;
84 - return this.timestamp.equals(that.timestamp);
85 - }
86 - return false;
87 - }
88 -}
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 +import com.google.common.collect.ImmutableMap;
20 +
19 import org.onosproject.cluster.NodeId; 21 import org.onosproject.cluster.NodeId;
20 -import org.onosproject.store.Timestamp;
21 22
22 import java.util.Map; 23 import java.util.Map;
23 -
24 import static com.google.common.base.Preconditions.checkNotNull; 24 import static com.google.common.base.Preconditions.checkNotNull;
25 25
26 /** 26 /**
...@@ -29,22 +29,18 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -29,22 +29,18 @@ import static com.google.common.base.Preconditions.checkNotNull;
29 public class AntiEntropyAdvertisement<K> { 29 public class AntiEntropyAdvertisement<K> {
30 30
31 private final NodeId sender; 31 private final NodeId sender;
32 - private final Map<K, Timestamp> timestamps; 32 + private final Map<K, MapValue.Digest> digest;
33 - private final Map<K, Timestamp> tombstones;
34 33
35 /** 34 /**
36 * Creates a new anti entropy advertisement message. 35 * Creates a new anti entropy advertisement message.
37 * 36 *
38 * @param sender the sender's node ID 37 * @param sender the sender's node ID
39 - * @param timestamps map of item key to timestamp for current items 38 + * @param digest for map entries
40 - * @param tombstones map of item key to timestamp for removed items
41 */ 39 */
42 public AntiEntropyAdvertisement(NodeId sender, 40 public AntiEntropyAdvertisement(NodeId sender,
43 - Map<K, Timestamp> timestamps, 41 + Map<K, MapValue.Digest> digest) {
44 - Map<K, Timestamp> tombstones) {
45 this.sender = checkNotNull(sender); 42 this.sender = checkNotNull(sender);
46 - this.timestamps = checkNotNull(timestamps); 43 + this.digest = ImmutableMap.copyOf(checkNotNull(digest));
47 - this.tombstones = checkNotNull(tombstones);
48 } 44 }
49 45
50 /** 46 /**
...@@ -57,36 +53,19 @@ public class AntiEntropyAdvertisement<K> { ...@@ -57,36 +53,19 @@ public class AntiEntropyAdvertisement<K> {
57 } 53 }
58 54
59 /** 55 /**
60 - * Returns the map of current item timestamps. 56 + * Returns the digest for map entries.
61 * 57 *
62 - * @return current item timestamps 58 + * @return mapping from key to associated digest
63 */ 59 */
64 - public Map<K, Timestamp> timestamps() { 60 + public Map<K, MapValue.Digest> digest() {
65 - return timestamps; 61 + return digest;
66 - }
67 -
68 - /**
69 - * Returns the map of removed item timestamps.
70 - *
71 - * @return removed item timestamps
72 - */
73 - public Map<K, Timestamp> tombstones() {
74 - return tombstones;
75 - }
76 -
77 - // For serializer
78 - @SuppressWarnings("unused")
79 - private AntiEntropyAdvertisement() {
80 - this.sender = null;
81 - this.timestamps = null;
82 - this.tombstones = null;
83 } 62 }
84 63
85 @Override 64 @Override
86 public String toString() { 65 public String toString() {
87 return MoreObjects.toStringHelper(getClass()) 66 return MoreObjects.toStringHelper(getClass())
88 - .add("timestampsSize", timestamps.size()) 67 + .add("sender", sender)
89 - .add("tombstonesSize", tombstones.size()) 68 + .add("totalEntries", digest.size())
90 .toString(); 69 .toString();
91 } 70 }
92 } 71 }
......
...@@ -18,9 +18,8 @@ package org.onosproject.store.ecmap; ...@@ -18,9 +18,8 @@ package org.onosproject.store.ecmap;
18 import com.google.common.collect.ImmutableList; 18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.Lists; 19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Maps; 20 import com.google.common.collect.Maps;
21 +import com.google.common.collect.Sets;
21 22
22 -import org.apache.commons.lang3.RandomUtils;
23 -import org.apache.commons.lang3.mutable.MutableBoolean;
24 import org.apache.commons.lang3.tuple.Pair; 23 import org.apache.commons.lang3.tuple.Pair;
25 import org.onlab.util.AbstractAccumulator; 24 import org.onlab.util.AbstractAccumulator;
26 import org.onlab.util.KryoNamespace; 25 import org.onlab.util.KryoNamespace;
...@@ -30,12 +29,10 @@ import org.onosproject.cluster.ControllerNode; ...@@ -30,12 +29,10 @@ import org.onosproject.cluster.ControllerNode;
30 import org.onosproject.cluster.NodeId; 29 import org.onosproject.cluster.NodeId;
31 import org.onosproject.store.Timestamp; 30 import org.onosproject.store.Timestamp;
32 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 31 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33 -import org.onosproject.store.cluster.messaging.ClusterMessage;
34 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
35 import org.onosproject.store.cluster.messaging.MessageSubject; 32 import org.onosproject.store.cluster.messaging.MessageSubject;
36 import org.onosproject.store.impl.LogicalTimestamp; 33 import org.onosproject.store.impl.LogicalTimestamp;
37 -import org.onosproject.store.impl.Timestamped;
38 import org.onosproject.store.service.WallClockTimestamp; 34 import org.onosproject.store.service.WallClockTimestamp;
35 +import org.onosproject.store.serializers.KryoNamespaces;
39 import org.onosproject.store.serializers.KryoSerializer; 36 import org.onosproject.store.serializers.KryoSerializer;
40 import org.onosproject.store.service.EventuallyConsistentMap; 37 import org.onosproject.store.service.EventuallyConsistentMap;
41 import org.onosproject.store.service.EventuallyConsistentMapEvent; 38 import org.onosproject.store.service.EventuallyConsistentMapEvent;
...@@ -43,30 +40,27 @@ import org.onosproject.store.service.EventuallyConsistentMapListener; ...@@ -43,30 +40,27 @@ import org.onosproject.store.service.EventuallyConsistentMapListener;
43 import org.slf4j.Logger; 40 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory; 41 import org.slf4j.LoggerFactory;
45 42
46 -import java.util.ArrayList; 43 +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
44 +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
45 +
47 import java.util.Collection; 46 import java.util.Collection;
48 -import java.util.HashMap; 47 +import java.util.Collections;
49 -import java.util.LinkedList;
50 import java.util.List; 48 import java.util.List;
51 import java.util.Map; 49 import java.util.Map;
52 import java.util.Optional; 50 import java.util.Optional;
53 import java.util.Set; 51 import java.util.Set;
54 import java.util.Timer; 52 import java.util.Timer;
55 -import java.util.concurrent.ConcurrentHashMap;
56 -import java.util.concurrent.ConcurrentMap;
57 -import java.util.concurrent.CopyOnWriteArraySet;
58 import java.util.concurrent.ExecutorService; 53 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors; 54 import java.util.concurrent.Executors;
60 import java.util.concurrent.ScheduledExecutorService; 55 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.TimeUnit; 56 import java.util.concurrent.TimeUnit;
57 +import java.util.concurrent.atomic.AtomicBoolean;
62 import java.util.concurrent.atomic.AtomicReference; 58 import java.util.concurrent.atomic.AtomicReference;
63 import java.util.function.BiFunction; 59 import java.util.function.BiFunction;
64 import java.util.stream.Collectors; 60 import java.util.stream.Collectors;
65 61
66 import static com.google.common.base.Preconditions.checkNotNull; 62 import static com.google.common.base.Preconditions.checkNotNull;
67 import static com.google.common.base.Preconditions.checkState; 63 import static com.google.common.base.Preconditions.checkState;
68 -import static java.util.Objects.isNull;
69 -import static java.util.Objects.nonNull;
70 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 64 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
71 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool; 65 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
72 import static org.onlab.util.Tools.groupedThreads; 66 import static org.onlab.util.Tools.groupedThreads;
...@@ -80,12 +74,12 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -80,12 +74,12 @@ public class EventuallyConsistentMapImpl<K, V>
80 74
81 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class); 75 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
82 76
83 - private final ConcurrentMap<K, Timestamped<V>> items; 77 + private final Map<K, MapValue<V>> items;
84 - private final ConcurrentMap<K, Timestamp> removedItems;
85 78
86 private final ClusterService clusterService; 79 private final ClusterService clusterService;
87 private final ClusterCommunicationService clusterCommunicator; 80 private final ClusterCommunicationService clusterCommunicator;
88 private final KryoSerializer serializer; 81 private final KryoSerializer serializer;
82 + private final NodeId localNodeId;
89 83
90 private final BiFunction<K, V, Timestamp> timestampProvider; 84 private final BiFunction<K, V, Timestamp> timestampProvider;
91 85
...@@ -93,7 +87,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -93,7 +87,7 @@ public class EventuallyConsistentMapImpl<K, V>
93 private final MessageSubject antiEntropyAdvertisementSubject; 87 private final MessageSubject antiEntropyAdvertisementSubject;
94 88
95 private final Set<EventuallyConsistentMapListener<K, V>> listeners 89 private final Set<EventuallyConsistentMapListener<K, V>> listeners
96 - = new CopyOnWriteArraySet<>(); 90 + = Sets.newCopyOnWriteArraySet();
97 91
98 private final ExecutorService executor; 92 private final ExecutorService executor;
99 private final ScheduledExecutorService backgroundExecutor; 93 private final ScheduledExecutorService backgroundExecutor;
...@@ -162,13 +156,13 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -162,13 +156,13 @@ public class EventuallyConsistentMapImpl<K, V>
162 TimeUnit antiEntropyTimeUnit, 156 TimeUnit antiEntropyTimeUnit,
163 boolean convergeFaster, 157 boolean convergeFaster,
164 boolean persistent) { 158 boolean persistent) {
165 - items = new ConcurrentHashMap<>(); 159 + items = Maps.newConcurrentMap();
166 - removedItems = new ConcurrentHashMap<>();
167 senderPending = Maps.newConcurrentMap(); 160 senderPending = Maps.newConcurrentMap();
168 destroyedMessage = mapName + ERROR_DESTROYED; 161 destroyedMessage = mapName + ERROR_DESTROYED;
169 162
170 this.clusterService = clusterService; 163 this.clusterService = clusterService;
171 this.clusterCommunicator = clusterCommunicator; 164 this.clusterCommunicator = clusterCommunicator;
165 + this.localNodeId = clusterService.getLocalNode().id();
172 166
173 this.serializer = createSerializer(serializerBuilder); 167 this.serializer = createSerializer(serializerBuilder);
174 168
...@@ -179,7 +173,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -179,7 +173,7 @@ public class EventuallyConsistentMapImpl<K, V>
179 } else { 173 } else {
180 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream() 174 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
181 .map(ControllerNode::id) 175 .map(ControllerNode::id)
182 - .filter(nodeId -> !nodeId.equals(clusterService.getLocalNode().id())) 176 + .filter(nodeId -> !nodeId.equals(localNodeId))
183 .collect(Collectors.toList()); 177 .collect(Collectors.toList());
184 } 178 }
185 179
...@@ -210,7 +204,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -210,7 +204,7 @@ public class EventuallyConsistentMapImpl<K, V>
210 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter")); 204 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
211 205
212 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer); 206 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
213 - persistentStore.readInto(items, removedItems); 207 + persistentStore.readInto(items);
214 } else { 208 } else {
215 this.persistentStore = null; 209 this.persistentStore = null;
216 } 210 }
...@@ -223,17 +217,21 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -223,17 +217,21 @@ public class EventuallyConsistentMapImpl<K, V>
223 } 217 }
224 218
225 // start anti-entropy thread 219 // start anti-entropy thread
226 - this.backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), 220 + this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
227 initialDelaySec, antiEntropyPeriod, 221 initialDelaySec, antiEntropyPeriod,
228 antiEntropyTimeUnit); 222 antiEntropyTimeUnit);
229 223
230 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update"); 224 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
231 clusterCommunicator.addSubscriber(updateMessageSubject, 225 clusterCommunicator.addSubscriber(updateMessageSubject,
232 - new InternalEventListener(), this.executor); 226 + serializer::decode,
227 + this::processUpdates,
228 + this.executor);
233 229
234 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy"); 230 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
235 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject, 231 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
236 - new InternalAntiEntropyListener(), this.backgroundExecutor); 232 + serializer::decode,
233 + this::handleAntiEntropyAdvertisement,
234 + this.backgroundExecutor);
237 235
238 this.tombstonesDisabled = tombstonesDisabled; 236 this.tombstonesDisabled = tombstonesDisabled;
239 this.lightweightAntiEntropy = !convergeFaster; 237 this.lightweightAntiEntropy = !convergeFaster;
...@@ -245,14 +243,13 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -245,14 +243,13 @@ public class EventuallyConsistentMapImpl<K, V>
245 protected void setupKryoPool() { 243 protected void setupKryoPool() {
246 // Add the map's internal helper classes to the user-supplied serializer 244 // Add the map's internal helper classes to the user-supplied serializer
247 serializerPool = builder 245 serializerPool = builder
246 + .register(KryoNamespaces.BASIC)
248 .register(LogicalTimestamp.class) 247 .register(LogicalTimestamp.class)
249 .register(WallClockTimestamp.class) 248 .register(WallClockTimestamp.class)
250 - .register(PutEntry.class)
251 - .register(RemoveEntry.class)
252 - .register(ArrayList.class)
253 .register(AntiEntropyAdvertisement.class) 249 .register(AntiEntropyAdvertisement.class)
254 - .register(HashMap.class) 250 + .register(UpdateEntry.class)
255 - .register(Timestamped.class) 251 + .register(MapValue.class)
252 + .register(MapValue.Digest.class)
256 .build(); 253 .build();
257 } 254 }
258 }; 255 };
...@@ -261,29 +258,31 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -261,29 +258,31 @@ public class EventuallyConsistentMapImpl<K, V>
261 @Override 258 @Override
262 public int size() { 259 public int size() {
263 checkState(!destroyed, destroyedMessage); 260 checkState(!destroyed, destroyedMessage);
264 - return items.size(); 261 + // TODO: Maintain a separate counter for tracking live elements in map.
262 + return Maps.filterValues(items, MapValue::isAlive).size();
265 } 263 }
266 264
267 @Override 265 @Override
268 public boolean isEmpty() { 266 public boolean isEmpty() {
269 checkState(!destroyed, destroyedMessage); 267 checkState(!destroyed, destroyedMessage);
270 - return items.isEmpty(); 268 + return size() == 0;
271 } 269 }
272 270
273 @Override 271 @Override
274 public boolean containsKey(K key) { 272 public boolean containsKey(K key) {
275 checkState(!destroyed, destroyedMessage); 273 checkState(!destroyed, destroyedMessage);
276 checkNotNull(key, ERROR_NULL_KEY); 274 checkNotNull(key, ERROR_NULL_KEY);
277 - return items.containsKey(key); 275 + return get(key) != null;
278 } 276 }
279 277
280 @Override 278 @Override
281 public boolean containsValue(V value) { 279 public boolean containsValue(V value) {
282 checkState(!destroyed, destroyedMessage); 280 checkState(!destroyed, destroyedMessage);
283 checkNotNull(value, ERROR_NULL_VALUE); 281 checkNotNull(value, ERROR_NULL_VALUE);
284 - 282 + return items.values()
285 - return items.values().stream() 283 + .stream()
286 - .anyMatch(timestamped -> timestamped.value().equals(value)); 284 + .filter(MapValue::isAlive)
285 + .anyMatch(v -> v.get().equals(value));
287 } 286 }
288 287
289 @Override 288 @Override
...@@ -291,11 +290,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -291,11 +290,8 @@ public class EventuallyConsistentMapImpl<K, V>
291 checkState(!destroyed, destroyedMessage); 290 checkState(!destroyed, destroyedMessage);
292 checkNotNull(key, ERROR_NULL_KEY); 291 checkNotNull(key, ERROR_NULL_KEY);
293 292
294 - Timestamped<V> value = items.get(key); 293 + MapValue<V> value = items.get(key);
295 - if (value != null) { 294 + return (value == null || value.isTombstone()) ? null : value.get();
296 - return value.value();
297 - }
298 - return null;
299 } 295 }
300 296
301 @Override 297 @Override
...@@ -304,123 +300,18 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -304,123 +300,18 @@ public class EventuallyConsistentMapImpl<K, V>
304 checkNotNull(key, ERROR_NULL_KEY); 300 checkNotNull(key, ERROR_NULL_KEY);
305 checkNotNull(value, ERROR_NULL_VALUE); 301 checkNotNull(value, ERROR_NULL_VALUE);
306 302
307 - Timestamp timestamp = timestampProvider.apply(key, value); 303 + MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
308 - 304 + if (updateInternal(key, newValue)) {
309 - if (putInternal(key, value, timestamp)) { 305 + notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
310 - notifyPeers(new PutEntry<>(key, value, timestamp), 306 + notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
311 - peerUpdateFunction.apply(key, value));
312 - notifyListeners(new EventuallyConsistentMapEvent<>(
313 - EventuallyConsistentMapEvent.Type.PUT, key, value));
314 } 307 }
315 } 308 }
316 309
317 - private boolean putInternal(K key, V value, Timestamp timestamp) {
318 - counter.incrementCount();
319 - Timestamp removed = removedItems.get(key);
320 - if (removed != null && removed.isNewerThan(timestamp)) {
321 - log.debug("ecmap - removed was newer {}", value);
322 - return false;
323 - }
324 -
325 - final MutableBoolean updated = new MutableBoolean(false);
326 -
327 - items.compute(key, (k, existing) -> {
328 - if (existing != null && existing.isNewerThan(timestamp)) {
329 - updated.setFalse();
330 - return existing;
331 - } else {
332 - updated.setTrue();
333 - return new Timestamped<>(value, timestamp);
334 - }
335 - });
336 -
337 - boolean success = updated.booleanValue();
338 - if (!success) {
339 - log.debug("ecmap - existing was newer {}", value);
340 - }
341 -
342 - if (success && removed != null) {
343 - removedItems.remove(key, removed);
344 - }
345 -
346 - if (success && persistent) {
347 - persistentStore.put(key, value, timestamp);
348 - }
349 -
350 - return success;
351 - }
352 -
353 @Override 310 @Override
354 public V remove(K key) { 311 public V remove(K key) {
355 checkState(!destroyed, destroyedMessage); 312 checkState(!destroyed, destroyedMessage);
356 checkNotNull(key, ERROR_NULL_KEY); 313 checkNotNull(key, ERROR_NULL_KEY);
357 - 314 + return removeInternal(key, Optional.empty());
358 - // TODO prevent calls here if value is important for timestamp
359 - Timestamp timestamp = timestampProvider.apply(key, null);
360 -
361 - Optional<V> removedValue = removeInternal(key, timestamp);
362 - if (removedValue == null) {
363 - return null;
364 - }
365 - notifyPeers(new RemoveEntry<>(key, timestamp),
366 - peerUpdateFunction.apply(key, null));
367 - notifyListeners(new EventuallyConsistentMapEvent<>(
368 - EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
369 -
370 - return removedValue.orElse(null);
371 - }
372 -
373 - /**
374 - * Returns null if the timestamp is for a outdated request i.e.
375 - * the value is the map is more recent or a tombstone exists with a
376 - * more recent timestamp.
377 - * Returns non-empty optional if a value was indeed removed from the map.
378 - * Returns empty optional if map did not contain a value for the key but the existing
379 - * tombstone is older than this timestamp.
380 - * @param key key
381 - * @param timestamp timestamp for remove request
382 - * @return Optional value.
383 - */
384 - private Optional<V> removeInternal(K key, Timestamp timestamp) {
385 - if (timestamp == null) {
386 - return null;
387 - }
388 -
389 - counter.incrementCount();
390 - final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
391 - items.compute(key, (k, existing) -> {
392 - if (existing != null && existing.isNewerThan(timestamp)) {
393 - return existing;
394 - } else {
395 - removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
396 - return null;
397 - }
398 - });
399 -
400 - if (isNull(removedValue.get())) {
401 - return null;
402 - }
403 -
404 - boolean updatedTombstone = false;
405 -
406 - if (!tombstonesDisabled) {
407 - Timestamp removedTimestamp = removedItems.get(key);
408 - if (removedTimestamp == null) {
409 - //Timestamp removed = removedItems.putIfAbsent(key, timestamp);
410 - updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
411 - } else if (timestamp.isNewerThan(removedTimestamp)) {
412 - updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
413 - }
414 - }
415 -
416 - if (persistent) {
417 - persistentStore.remove(key, timestamp);
418 - }
419 -
420 - if (tombstonesDisabled || updatedTombstone) {
421 - return removedValue.get();
422 - }
423 - return null;
424 } 315 }
425 316
426 @Override 317 @Override
...@@ -428,15 +319,34 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -428,15 +319,34 @@ public class EventuallyConsistentMapImpl<K, V>
428 checkState(!destroyed, destroyedMessage); 319 checkState(!destroyed, destroyedMessage);
429 checkNotNull(key, ERROR_NULL_KEY); 320 checkNotNull(key, ERROR_NULL_KEY);
430 checkNotNull(value, ERROR_NULL_VALUE); 321 checkNotNull(value, ERROR_NULL_VALUE);
322 + removeInternal(key, Optional.of(value));
323 + }
431 324
432 - Timestamp timestamp = timestampProvider.apply(key, value); 325 + private V removeInternal(K key, Optional<V> value) {
326 + checkState(!destroyed, destroyedMessage);
327 + checkNotNull(key, ERROR_NULL_KEY);
328 + checkNotNull(value, ERROR_NULL_VALUE);
433 329
434 - if (nonNull(removeInternal(key, timestamp))) { 330 + MapValue<V> newValue = new MapValue<>(null, timestampProvider.apply(key, value.orElse(null)));
435 - notifyPeers(new RemoveEntry<>(key, timestamp), 331 + AtomicBoolean updated = new AtomicBoolean(false);
436 - peerUpdateFunction.apply(key, value)); 332 + AtomicReference<V> previousValue = new AtomicReference<>();
437 - notifyListeners(new EventuallyConsistentMapEvent<>( 333 + items.compute(key, (k, existing) -> {
438 - EventuallyConsistentMapEvent.Type.REMOVE, key, value)); 334 + if (existing != null && existing.isAlive()) {
335 + updated.set(!value.isPresent() || value.get().equals(existing.get()));
336 + previousValue.set(existing.get());
337 + }
338 + updated.set(existing == null || newValue.isNewerThan(existing));
339 + return updated.get() ? newValue : existing;
340 + });
341 + if (updated.get()) {
342 + notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, previousValue.get()));
343 + notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
344 + if (persistent) {
345 + persistentStore.update(key, newValue);
346 + }
347 + return previousValue.get();
439 } 348 }
349 + return null;
440 } 350 }
441 351
442 @Override 352 @Override
...@@ -448,30 +358,59 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -448,30 +358,59 @@ public class EventuallyConsistentMapImpl<K, V>
448 @Override 358 @Override
449 public void clear() { 359 public void clear() {
450 checkState(!destroyed, destroyedMessage); 360 checkState(!destroyed, destroyedMessage);
451 - items.forEach((key, value) -> remove(key)); 361 + Maps.filterValues(items, MapValue::isAlive)
362 + .forEach((k, v) -> remove(k));
452 } 363 }
453 364
454 @Override 365 @Override
455 public Set<K> keySet() { 366 public Set<K> keySet() {
456 checkState(!destroyed, destroyedMessage); 367 checkState(!destroyed, destroyedMessage);
457 - return items.keySet(); 368 + return Maps.filterValues(items, MapValue::isAlive)
369 + .keySet();
458 } 370 }
459 371
460 @Override 372 @Override
461 public Collection<V> values() { 373 public Collection<V> values() {
462 checkState(!destroyed, destroyedMessage); 374 checkState(!destroyed, destroyedMessage);
463 - return items.values().stream() 375 + return Maps.filterValues(items, MapValue::isAlive)
464 - .map(Timestamped::value) 376 + .values()
465 - .collect(Collectors.toList()); 377 + .stream()
378 + .map(MapValue::get)
379 + .collect(Collectors.toList());
466 } 380 }
467 381
468 @Override 382 @Override
469 public Set<Map.Entry<K, V>> entrySet() { 383 public Set<Map.Entry<K, V>> entrySet() {
470 checkState(!destroyed, destroyedMessage); 384 checkState(!destroyed, destroyedMessage);
385 + return Maps.filterValues(items, MapValue::isAlive)
386 + .entrySet()
387 + .stream()
388 + .map(e -> Pair.of(e.getKey(), e.getValue().get()))
389 + .collect(Collectors.toSet());
390 + }
471 391
472 - return items.entrySet().stream() 392 + /**
473 - .map(e -> Pair.of(e.getKey(), e.getValue().value())) 393 + * Returns true if newValue was accepted i.e. map is updated.
474 - .collect(Collectors.toSet()); 394 + * @param key key
395 + * @param newValue proposed new value
396 + * @return true if update happened; false if map already contains a more recent value for the key
397 + */
398 + private boolean updateInternal(K key, MapValue<V> newValue) {
399 + AtomicBoolean updated = new AtomicBoolean(false);
400 + items.compute(key, (k, existing) -> {
401 + if (existing == null || newValue.isNewerThan(existing)) {
402 + updated.set(true);
403 + if (newValue.isTombstone()) {
404 + return tombstonesDisabled ? null : newValue;
405 + }
406 + return newValue;
407 + }
408 + return existing;
409 + });
410 + if (updated.get() && persistent) {
411 + persistentStore.update(key, newValue);
412 + }
413 + return updated.get();
475 } 414 }
476 415
477 @Override 416 @Override
...@@ -503,26 +442,20 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -503,26 +442,20 @@ public class EventuallyConsistentMapImpl<K, V>
503 } 442 }
504 443
505 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) { 444 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
506 - for (EventuallyConsistentMapListener<K, V> listener : listeners) { 445 + listeners.forEach(listener -> listener.event(event));
507 - listener.event(event);
508 - }
509 - }
510 -
511 - private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
512 - queueUpdate(event, peers);
513 } 446 }
514 447
515 - private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) { 448 + private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
516 queueUpdate(event, peers); 449 queueUpdate(event, peers);
517 } 450 }
518 451
519 - private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) { 452 + private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
520 if (peers == null) { 453 if (peers == null) {
521 // we have no friends :( 454 // we have no friends :(
522 return; 455 return;
523 } 456 }
524 peers.forEach(node -> 457 peers.forEach(node ->
525 - senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event) 458 + senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
526 ); 459 );
527 } 460 }
528 461
...@@ -530,276 +463,107 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -530,276 +463,107 @@ public class EventuallyConsistentMapImpl<K, V>
530 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD; 463 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
531 } 464 }
532 465
533 - private final class SendAdvertisementTask implements Runnable { 466 + private void sendAdvertisement() {
534 - @Override 467 + try {
535 - public void run() {
536 - if (Thread.currentThread().isInterrupted()) {
537 - log.info("Interrupted, quitting");
538 - return;
539 - }
540 -
541 if (underHighLoad() || destroyed) { 468 if (underHighLoad() || destroyed) {
542 return; 469 return;
543 } 470 }
544 - 471 + pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
545 - try { 472 + } catch (Exception e) {
546 - final NodeId self = clusterService.getLocalNode().id(); 473 + // Catch all exceptions to avoid scheduled task being suppressed.
547 - Set<ControllerNode> nodes = clusterService.getNodes(); 474 + log.error("Exception thrown while sending advertisement", e);
548 -
549 - List<NodeId> nodeIds = nodes.stream()
550 - .map(ControllerNode::id)
551 - .collect(Collectors.toList());
552 -
553 - if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
554 - log.trace("No other peers in the cluster.");
555 - return;
556 - }
557 -
558 - NodeId peer;
559 - do {
560 - int idx = RandomUtils.nextInt(0, nodeIds.size());
561 - peer = nodeIds.get(idx);
562 - } while (peer.equals(self));
563 -
564 - if (Thread.currentThread().isInterrupted()) {
565 - log.info("Interrupted, quitting");
566 - return;
567 - }
568 -
569 - AntiEntropyAdvertisement<K> ad = createAdvertisement();
570 - NodeId destination = peer;
571 - clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
572 - .whenComplete((result, error) -> {
573 - if (error != null) {
574 - log.debug("Failed to send anti-entropy advertisement to {}", destination);
575 - }
576 - });
577 -
578 - } catch (Exception e) {
579 - // Catch all exceptions to avoid scheduled task being suppressed.
580 - log.error("Exception thrown while sending advertisement", e);
581 - }
582 } 475 }
583 } 476 }
584 477
585 - private AntiEntropyAdvertisement<K> createAdvertisement() { 478 + private Optional<NodeId> pickRandomActivePeer() {
586 - final NodeId self = clusterService.getLocalNode().id(); 479 + List<NodeId> activePeers = clusterService.getNodes()
587 - 480 + .stream()
588 - Map<K, Timestamp> timestamps = new HashMap<>(items.size()); 481 + .filter(node -> !localNodeId.equals(node))
589 - 482 + .map(ControllerNode::id)
590 - items.forEach((key, value) -> timestamps.put(key, value.timestamp())); 483 + .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
484 + .collect(Collectors.toList());
485 + Collections.shuffle(activePeers);
486 + return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
487 + }
488 +
489 + private void sendAdvertisementToPeer(NodeId peer) {
490 + clusterCommunicator.unicast(createAdvertisement(),
491 + antiEntropyAdvertisementSubject,
492 + serializer::encode,
493 + peer)
494 + .whenComplete((result, error) -> {
495 + if (error != null) {
496 + log.warn("Failed to send anti-entropy advertisement to {}", peer);
497 + }
498 + });
499 + }
591 500
592 - Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
593 501
594 - return new AntiEntropyAdvertisement<>(self, timestamps, tombstones); 502 + private AntiEntropyAdvertisement<K> createAdvertisement() {
503 + return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
595 } 504 }
596 505
597 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) { 506 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
598 - List<EventuallyConsistentMapEvent<K, V>> externalEvents; 507 + if (destroyed || underHighLoad()) {
599 - 508 + return;
600 - externalEvents = antiEntropyCheckLocalItems(ad); 509 + }
601 - 510 + try {
602 - antiEntropyCheckLocalRemoved(ad); 511 + antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
603 -
604 - if (!lightweightAntiEntropy) {
605 - externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
606 512
607 - // if remote ad has something unknown, actively sync 513 + if (!lightweightAntiEntropy) {
608 - for (K key : ad.timestamps().keySet()) { 514 + Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
609 - if (!items.containsKey(key)) { 515 + // if remote ad has something unknown, actively sync
516 + if (missingKeys.size() > 0) {
610 // Send the advertisement back if this peer is out-of-sync 517 // Send the advertisement back if this peer is out-of-sync
611 - final NodeId sender = ad.sender(); 518 + // TODO: Send ad for missing keys and for entries that are stale
612 - AntiEntropyAdvertisement<K> myAd = createAdvertisement(); 519 + sendAdvertisementToPeer(ad.sender());
613 -
614 - clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
615 - .whenComplete((result, error) -> {
616 - if (error != null) {
617 - log.debug("Failed to send reactive "
618 - + "anti-entropy advertisement to {}", sender);
619 - }
620 - });
621 - break;
622 } 520 }
623 } 521 }
522 + } catch (Exception e) {
523 + log.warn("Error handling anti-entropy advertisement", e);
624 } 524 }
625 - externalEvents.forEach(this::notifyListeners);
626 } 525 }
627 526
628 /** 527 /**
629 - * Checks if any of the remote's live items or tombstones are out of date 528 + * Processes anti-entropy ad from peer by taking following actions:
630 - * according to our local live item list, or if our live items are out of 529 + * 1. If peer has an old entry, updates peer.
631 - * date according to the remote's tombstone list. 530 + * 2. If peer indicates an entry is removed and has a more recent
632 - * If the local copy is more recent, it will be pushed to the remote. If the 531 + * timestamp than the local entry, update local state.
633 - * remote has a more recent remove, we apply that to the local state.
634 - *
635 - * @param ad remote anti-entropy advertisement
636 - * @return list of external events relating to local operations performed
637 */ 532 */
638 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems( 533 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
639 AntiEntropyAdvertisement<K> ad) { 534 AntiEntropyAdvertisement<K> ad) {
640 - final List<EventuallyConsistentMapEvent<K, V>> externalEvents 535 + final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
641 - = new LinkedList<>();
642 final NodeId sender = ad.sender(); 536 final NodeId sender = ad.sender();
643 - 537 + items.forEach((key, localValue) -> {
644 - for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) { 538 + MapValue.Digest remoteValueDigest = ad.digest().get(key);
645 - K key = item.getKey(); 539 + if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
646 - Timestamped<V> localValue = item.getValue();
647 -
648 - Timestamp remoteTimestamp = ad.timestamps().get(key);
649 - if (remoteTimestamp == null) {
650 - remoteTimestamp = ad.tombstones().get(key);
651 - }
652 - if (remoteTimestamp == null || localValue
653 - .isNewerThan(remoteTimestamp)) {
654 // local value is more recent, push to sender 540 // local value is more recent, push to sender
655 - queueUpdate(new PutEntry<>(key, localValue.value(), 541 + queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
656 - localValue.timestamp()), ImmutableList.of(sender)); 542 + } else {
657 - } 543 + if (remoteValueDigest.isTombstone()
658 - 544 + && remoteValueDigest.timestamp().isNewerThan(localValue.timestamp())) {
659 - Timestamp remoteDeadTimestamp = ad.tombstones().get(key); 545 + if (updateInternal(key, new MapValue<>(null, remoteValueDigest.timestamp()))) {
660 - if (remoteDeadTimestamp != null && 546 + externalEvents.add(new EventuallyConsistentMapEvent<>(REMOVE, key, null));
661 - remoteDeadTimestamp.isNewerThan(localValue.timestamp())) { 547 + }
662 - // sender has a more recent remove
663 - if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
664 - externalEvents.add(new EventuallyConsistentMapEvent<>(
665 - EventuallyConsistentMapEvent.Type.REMOVE, key, null));
666 - }
667 - }
668 - }
669 -
670 - return externalEvents;
671 - }
672 -
673 - /**
674 - * Checks if any items in the remote live list are out of date according
675 - * to our tombstone list. If we find we have a more up to date tombstone,
676 - * we'll send it to the remote.
677 - *
678 - * @param ad remote anti-entropy advertisement
679 - */
680 - private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
681 - final NodeId sender = ad.sender();
682 -
683 - for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
684 - K key = dead.getKey();
685 - Timestamp localDeadTimestamp = dead.getValue();
686 -
687 - Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
688 - if (remoteLiveTimestamp != null
689 - && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
690 - // sender has zombie, push remove
691 - queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
692 - }
693 - }
694 - }
695 -
696 - /**
697 - * Checks if any of the local live items are out of date according to the
698 - * remote's tombstone advertisements. If we find a local item is out of date,
699 - * we'll apply the remove operation to the local state.
700 - *
701 - * @param ad remote anti-entropy advertisement
702 - * @return list of external events relating to local operations performed
703 - */
704 - private List<EventuallyConsistentMapEvent<K, V>>
705 - antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
706 - final List<EventuallyConsistentMapEvent<K, V>> externalEvents
707 - = new LinkedList<>();
708 -
709 - for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
710 - K key = remoteDead.getKey();
711 - Timestamp remoteDeadTimestamp = remoteDead.getValue();
712 -
713 - Timestamped<V> local = items.get(key);
714 - Timestamp localDead = removedItems.get(key);
715 - if (local != null && remoteDeadTimestamp.isNewerThan(
716 - local.timestamp())) {
717 - // If the remote has a more recent tombstone than either our local
718 - // value, then do a remove with their timestamp
719 - if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
720 - externalEvents.add(new EventuallyConsistentMapEvent<>(
721 - EventuallyConsistentMapEvent.Type.REMOVE, key, null));
722 } 548 }
723 - } else if (localDead != null && remoteDeadTimestamp.isNewerThan(
724 - localDead)) {
725 - // If the remote has a more recent tombstone than us, update ours
726 - // to their timestamp
727 - removeInternal(key, remoteDeadTimestamp);
728 } 549 }
729 - } 550 + });
730 -
731 return externalEvents; 551 return externalEvents;
732 } 552 }
733 553
734 - private final class InternalAntiEntropyListener 554 + private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
735 - implements ClusterMessageHandler { 555 + if (destroyed) {
736 - 556 + return;
737 - @Override
738 - public void handle(ClusterMessage message) {
739 - log.trace("Received anti-entropy advertisement from peer: {}",
740 - message.sender());
741 - AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
742 - try {
743 - if (!underHighLoad()) {
744 - handleAntiEntropyAdvertisement(advertisement);
745 - }
746 - } catch (Exception e) {
747 - log.warn("Exception thrown handling advertisements", e);
748 - }
749 } 557 }
750 - } 558 + updates.forEach(update -> {
751 - 559 + final K key = update.key();
752 - private final class InternalEventListener implements ClusterMessageHandler { 560 + final MapValue<V> value = update.value();
753 - @Override
754 - public void handle(ClusterMessage message) {
755 - if (destroyed) {
756 - return;
757 - }
758 561
759 - log.debug("Received update event from peer: {}", message.sender()); 562 + if (updateInternal(key, value)) {
760 - Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload()); 563 + final EventuallyConsistentMapEvent.Type type = value.isTombstone() ? REMOVE : PUT;
761 - 564 + notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value.get()));
762 - try {
763 - // TODO clean this for loop up
764 - for (AbstractEntry<K, V> entry : events) {
765 - final K key = entry.key();
766 - V value;
767 - final Timestamp timestamp = entry.timestamp();
768 - final EventuallyConsistentMapEvent.Type type;
769 - if (entry instanceof PutEntry) {
770 - PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
771 - value = putEntry.value();
772 - type = EventuallyConsistentMapEvent.Type.PUT;
773 - } else if (entry instanceof RemoveEntry) {
774 - type = EventuallyConsistentMapEvent.Type.REMOVE;
775 - value = null;
776 - } else {
777 - throw new IllegalStateException("Unknown entry type " + entry.getClass());
778 - }
779 -
780 - boolean success;
781 - switch (type) {
782 - case PUT:
783 - success = putInternal(key, value, timestamp);
784 - break;
785 - case REMOVE:
786 - Optional<V> removedValue = removeInternal(key, timestamp);
787 - success = removedValue != null;
788 - if (success) {
789 - value = removedValue.orElse(null);
790 - }
791 - break;
792 - default:
793 - success = false;
794 - }
795 - if (success) {
796 - notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
797 - }
798 - }
799 - } catch (Exception e) {
800 - log.warn("Exception thrown handling put", e);
801 } 565 }
802 - } 566 + });
803 } 567 }
804 568
805 // TODO pull this into the class if this gets pulled out... 569 // TODO pull this into the class if this gets pulled out...
...@@ -808,7 +572,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -808,7 +572,7 @@ public class EventuallyConsistentMapImpl<K, V>
808 private static final int DEFAULT_MAX_BATCH_MS = 50; 572 private static final int DEFAULT_MAX_BATCH_MS = 50;
809 private static final Timer TIMER = new Timer("onos-ecm-sender-events"); 573 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
810 574
811 - private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> { 575 + private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
812 576
813 private final NodeId peer; 577 private final NodeId peer;
814 578
...@@ -818,23 +582,21 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -818,23 +582,21 @@ public class EventuallyConsistentMapImpl<K, V>
818 } 582 }
819 583
820 @Override 584 @Override
821 - public void processItems(List<AbstractEntry<K, V>> items) { 585 + public void processItems(List<UpdateEntry<K, V>> items) {
822 - Map<K, AbstractEntry<K, V>> map = Maps.newHashMap(); 586 + Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
823 - items.forEach(item -> map.compute(item.key(), (key, oldValue) -> 587 + items.forEach(item -> map.compute(item.key(), (key, existing) ->
824 - oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue 588 + existing == null || item.compareTo(existing) > 0 ? item : existing));
825 - )
826 - );
827 communicationExecutor.submit(() -> { 589 communicationExecutor.submit(() -> {
828 - clusterCommunicator.unicast(Lists.newArrayList(map.values()), 590 + clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
829 updateMessageSubject, 591 updateMessageSubject,
830 serializer::encode, 592 serializer::encode,
831 peer) 593 peer)
832 .whenComplete((result, error) -> { 594 .whenComplete((result, error) -> {
833 if (error != null) { 595 if (error != null) {
834 - log.debug("Failed to send to {}", peer); 596 + log.debug("Failed to send to {}", peer, error);
835 } 597 }
836 }); 598 });
837 }); 599 });
838 } 600 }
839 } 601 }
840 -} 602 +}
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -16,13 +16,10 @@ ...@@ -16,13 +16,10 @@
16 16
17 package org.onosproject.store.ecmap; 17 package org.onosproject.store.ecmap;
18 18
19 -import org.apache.commons.lang3.mutable.MutableBoolean;
20 import org.mapdb.DB; 19 import org.mapdb.DB;
21 import org.mapdb.DBMaker; 20 import org.mapdb.DBMaker;
22 import org.mapdb.Hasher; 21 import org.mapdb.Hasher;
23 import org.mapdb.Serializer; 22 import org.mapdb.Serializer;
24 -import org.onosproject.store.Timestamp;
25 -import org.onosproject.store.impl.Timestamped;
26 import org.onosproject.store.serializers.KryoSerializer; 23 import org.onosproject.store.serializers.KryoSerializer;
27 24
28 import java.io.File; 25 import java.io.File;
...@@ -42,7 +39,6 @@ class MapDbPersistentStore<K, V> implements PersistentStore<K, V> { ...@@ -42,7 +39,6 @@ class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
42 private final DB database; 39 private final DB database;
43 40
44 private final Map<byte[], byte[]> items; 41 private final Map<byte[], byte[]> items;
45 - private final Map<byte[], byte[]> tombstones;
46 42
47 /** 43 /**
48 * Creates a new MapDB based persistent store. 44 * Creates a new MapDB based persistent store.
...@@ -65,102 +61,32 @@ class MapDbPersistentStore<K, V> implements PersistentStore<K, V> { ...@@ -65,102 +61,32 @@ class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
65 .valueSerializer(Serializer.BYTE_ARRAY) 61 .valueSerializer(Serializer.BYTE_ARRAY)
66 .hasher(Hasher.BYTE_ARRAY) 62 .hasher(Hasher.BYTE_ARRAY)
67 .makeOrGet(); 63 .makeOrGet();
68 -
69 - tombstones = database.createHashMap("tombstones")
70 - .keySerializer(Serializer.BYTE_ARRAY)
71 - .valueSerializer(Serializer.BYTE_ARRAY)
72 - .hasher(Hasher.BYTE_ARRAY)
73 - .makeOrGet();
74 } 64 }
75 65
76 @Override 66 @Override
77 - public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) { 67 + public void readInto(Map<K, MapValue<V>> items) {
78 this.items.forEach((keyBytes, valueBytes) -> 68 this.items.forEach((keyBytes, valueBytes) ->
79 items.put(serializer.decode(keyBytes), 69 items.put(serializer.decode(keyBytes),
80 - serializer.decode(valueBytes))); 70 + serializer.decode(valueBytes)));
81 -
82 - this.tombstones.forEach((keyBytes, valueBytes) ->
83 - tombstones.put(serializer.decode(keyBytes),
84 - serializer.decode(valueBytes)));
85 } 71 }
86 72
87 @Override 73 @Override
88 - public void put(K key, V value, Timestamp timestamp) { 74 + public void update(K key, MapValue<V> value) {
89 - executor.submit(() -> putInternal(key, value, timestamp)); 75 + executor.submit(() -> updateInternal(key, value));
90 } 76 }
91 77
92 - private void putInternal(K key, V value, Timestamp timestamp) { 78 + private void updateInternal(K key, MapValue<V> newValue) {
93 byte[] keyBytes = serializer.encode(key); 79 byte[] keyBytes = serializer.encode(key);
94 - byte[] removedBytes = tombstones.get(keyBytes);
95 -
96 - Timestamp removed = removedBytes == null ? null :
97 - serializer.decode(removedBytes);
98 - if (removed != null && removed.isNewerThan(timestamp)) {
99 - return;
100 - }
101 -
102 - final MutableBoolean updated = new MutableBoolean(false);
103 80
104 items.compute(keyBytes, (k, existingBytes) -> { 81 items.compute(keyBytes, (k, existingBytes) -> {
105 - Timestamped<V> existing = existingBytes == null ? null : 82 + MapValue<V> existing = existingBytes == null ? null :
106 serializer.decode(existingBytes); 83 serializer.decode(existingBytes);
107 - if (existing != null && existing.isNewerThan(timestamp)) { 84 + if (existing == null || newValue.isNewerThan(existing)) {
108 - updated.setFalse(); 85 + return serializer.encode(newValue);
109 - return existingBytes;
110 } else { 86 } else {
111 - updated.setTrue();
112 - return serializer.encode(new Timestamped<>(value, timestamp));
113 - }
114 - });
115 -
116 - boolean success = updated.booleanValue();
117 -
118 - if (success && removed != null) {
119 - tombstones.remove(keyBytes, removedBytes);
120 - }
121 -
122 - database.commit();
123 - }
124 -
125 - @Override
126 - public void remove(K key, Timestamp timestamp) {
127 - executor.submit(() -> removeInternal(key, timestamp));
128 - }
129 -
130 - private void removeInternal(K key, Timestamp timestamp) {
131 - byte[] keyBytes = serializer.encode(key);
132 -
133 - final MutableBoolean updated = new MutableBoolean(false);
134 -
135 - items.compute(keyBytes, (k, existingBytes) -> {
136 - Timestamp existing = existingBytes == null ? null :
137 - serializer.decode(existingBytes);
138 - if (existing != null && existing.isNewerThan(timestamp)) {
139 - updated.setFalse();
140 return existingBytes; 87 return existingBytes;
141 - } else {
142 - updated.setTrue();
143 - // remove from items map
144 - return null;
145 } 88 }
146 }); 89 });
147 -
148 - if (!updated.booleanValue()) {
149 - return;
150 - }
151 -
152 - byte[] timestampBytes = serializer.encode(timestamp);
153 - byte[] removedBytes = tombstones.get(keyBytes);
154 -
155 - Timestamp removedTimestamp = removedBytes == null ? null :
156 - serializer.decode(removedBytes);
157 - if (removedTimestamp == null) {
158 - tombstones.putIfAbsent(keyBytes, timestampBytes);
159 - } else if (timestamp.isNewerThan(removedTimestamp)) {
160 - tombstones.replace(keyBytes, removedBytes, timestampBytes);
161 - }
162 -
163 database.commit(); 90 database.commit();
164 } 91 }
165 -
166 } 92 }
......
1 +package org.onosproject.store.ecmap;
2 +
3 +import org.onosproject.store.Timestamp;
4 +import com.google.common.base.MoreObjects;
5 +
6 +/**
7 + * Representation of a value in EventuallyConsistentMap.
8 + *
9 + * @param <V> value type
10 + */
11 +public class MapValue<V> implements Comparable<MapValue<V>> {
12 + private final Timestamp timestamp;
13 + private final V value;
14 +
15 + public MapValue(V value, Timestamp timestamp) {
16 + this.value = value;
17 + this.timestamp = timestamp;
18 + }
19 +
20 + public boolean isTombstone() {
21 + return value == null;
22 + }
23 +
24 + public boolean isAlive() {
25 + return value != null;
26 + }
27 +
28 + public Timestamp timestamp() {
29 + return timestamp;
30 + }
31 +
32 + public V get() {
33 + return value;
34 + }
35 +
36 + @Override
37 + public int compareTo(MapValue<V> o) {
38 + return this.timestamp.compareTo(o.timestamp);
39 + }
40 +
41 + public boolean isNewerThan(MapValue<V> other) {
42 + return timestamp.isNewerThan(other.timestamp);
43 + }
44 +
45 + public boolean isNewerThan(Timestamp timestamp) {
46 + return timestamp.isNewerThan(timestamp);
47 + }
48 +
49 + public Digest digest() {
50 + return new Digest(timestamp, isTombstone());
51 + }
52 +
53 + @Override
54 + public String toString() {
55 + return MoreObjects.toStringHelper(getClass())
56 + .add("timestamp", timestamp)
57 + .add("value", value)
58 + .toString();
59 + }
60 +
61 + @SuppressWarnings("unused")
62 + private MapValue() {
63 + this.timestamp = null;
64 + this.value = null;
65 + }
66 +
67 + /**
68 + * Digest or summary of a MapValue for use during Anti-Entropy exchanges.
69 + */
70 + public static class Digest {
71 + private final Timestamp timestamp;
72 + private final boolean isTombstone;
73 +
74 + public Digest(Timestamp timestamp, boolean isTombstone) {
75 + this.timestamp = timestamp;
76 + this.isTombstone = isTombstone;
77 + }
78 +
79 + public Timestamp timestamp() {
80 + return timestamp;
81 + }
82 +
83 + public boolean isTombstone() {
84 + return isTombstone;
85 + }
86 +
87 + public boolean isNewerThan(Digest other) {
88 + return timestamp.isNewerThan(other.timestamp);
89 + }
90 +
91 + @Override
92 + public String toString() {
93 + return MoreObjects.toStringHelper(getClass())
94 + .add("timestamp", timestamp)
95 + .add("isTombstone", isTombstone)
96 + .toString();
97 + }
98 + }
99 +}
...@@ -16,9 +16,6 @@ ...@@ -16,9 +16,6 @@
16 16
17 package org.onosproject.store.ecmap; 17 package org.onosproject.store.ecmap;
18 18
19 -import org.onosproject.store.Timestamp;
20 -import org.onosproject.store.impl.Timestamped;
21 -
22 import java.util.Map; 19 import java.util.Map;
23 20
24 /** 21 /**
...@@ -30,24 +27,14 @@ interface PersistentStore<K, V> { ...@@ -30,24 +27,14 @@ interface PersistentStore<K, V> {
30 * Read the contents of the disk into the given maps. 27 * Read the contents of the disk into the given maps.
31 * 28 *
32 * @param items items map 29 * @param items items map
33 - * @param tombstones tombstones map
34 */ 30 */
35 - void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones); 31 + void readInto(Map<K, MapValue<V>> items);
36 32
37 /** 33 /**
38 - * Puts a new key,value pair into the map on disk. 34 + * Updates a key,value pair in the persistent store.
39 * 35 *
40 * @param key the key 36 * @param key the key
41 * @param value the value 37 * @param value the value
42 - * @param timestamp the timestamp of the update
43 - */
44 - void put(K key, V value, Timestamp timestamp);
45 -
46 - /**
47 - * Removes a key from the map on disk.
48 - *
49 - * @param key the key
50 - * @param timestamp the timestamp of the update
51 */ 38 */
52 - void remove(K key, Timestamp timestamp); 39 + void update(K key, MapValue<V> value);
53 } 40 }
......
1 -/*
2 - * Copyright 2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -package org.onosproject.store.ecmap;
17 -
18 -import com.google.common.base.MoreObjects;
19 -import org.onosproject.store.Timestamp;
20 -
21 -/**
22 - * Describes a single remove event in an EventuallyConsistentMap.
23 - */
24 -final class RemoveEntry<K, V> extends AbstractEntry<K, V> {
25 - /**
26 - * Creates a new remove entry.
27 - *
28 - * @param key key of the entry
29 - * @param timestamp timestamp of the remove event
30 - */
31 - public RemoveEntry(K key, Timestamp timestamp) {
32 - super(key, timestamp);
33 - }
34 -
35 - // Needed for serialization.
36 - @SuppressWarnings("unused")
37 - private RemoveEntry() {
38 - super();
39 - }
40 -
41 - @Override
42 - public String toString() {
43 - return MoreObjects.toStringHelper(getClass())
44 - .add("key", key())
45 - .add("timestamp", timestamp())
46 - .toString();
47 - }
48 -}
...@@ -15,34 +15,35 @@ ...@@ -15,34 +15,35 @@
15 */ 15 */
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 -import com.google.common.base.MoreObjects;
19 -import org.onosproject.store.Timestamp;
20 -
21 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
22 19
20 +import com.google.common.base.MoreObjects;
21 +
23 /** 22 /**
24 - * Describes a single put event in an EventuallyConsistentMap. 23 + * Describes a single update event in an EventuallyConsistentMap.
25 */ 24 */
26 -final class PutEntry<K, V> extends AbstractEntry<K, V> { 25 +final class UpdateEntry<K, V> implements Comparable<UpdateEntry<K, V>> {
27 - private final V value; 26 + private final K key;
27 + private final MapValue<V> value;
28 28
29 /** 29 /**
30 - * Creates a new put entry. 30 + * Creates a new update entry.
31 * 31 *
32 * @param key key of the entry 32 * @param key key of the entry
33 * @param value value of the entry 33 * @param value value of the entry
34 - * @param timestamp timestamp of the put event
35 */ 34 */
36 - public PutEntry(K key, V value, Timestamp timestamp) { 35 + public UpdateEntry(K key, MapValue<V> value) {
37 - super(key, timestamp); 36 + this.key = checkNotNull(key);
38 this.value = checkNotNull(value); 37 this.value = checkNotNull(value);
39 } 38 }
40 39
41 - // Needed for serialization. 40 + /**
42 - @SuppressWarnings("unused") 41 + * Returns the key.
43 - private PutEntry() { 42 + *
44 - super(); 43 + * @return the key
45 - this.value = null; 44 + */
45 + public K key() {
46 + return key;
46 } 47 }
47 48
48 /** 49 /**
...@@ -50,16 +51,26 @@ final class PutEntry<K, V> extends AbstractEntry<K, V> { ...@@ -50,16 +51,26 @@ final class PutEntry<K, V> extends AbstractEntry<K, V> {
50 * 51 *
51 * @return the value 52 * @return the value
52 */ 53 */
53 - public V value() { 54 + public MapValue<V> value() {
54 return value; 55 return value;
55 } 56 }
56 57
57 @Override 58 @Override
59 + public int compareTo(UpdateEntry<K, V> o) {
60 + return this.value.timestamp().compareTo(o.value.timestamp());
61 + }
62 +
63 + @Override
58 public String toString() { 64 public String toString() {
59 return MoreObjects.toStringHelper(getClass()) 65 return MoreObjects.toStringHelper(getClass())
60 .add("key", key()) 66 .add("key", key())
61 .add("value", value) 67 .add("value", value)
62 - .add("timestamp", timestamp())
63 .toString(); 68 .toString();
64 } 69 }
70 +
71 + @SuppressWarnings("unused")
72 + private UpdateEntry() {
73 + this.key = null;
74 + this.value = null;
75 + }
65 } 76 }
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 import com.google.common.collect.ComparisonChain; 18 import com.google.common.collect.ComparisonChain;
19 +import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableSet; 20 import com.google.common.collect.ImmutableSet;
20 -import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.MoreExecutors; 21 import com.google.common.util.concurrent.MoreExecutors;
22 22
23 import org.junit.After; 23 import org.junit.After;
...@@ -32,7 +32,6 @@ import org.onosproject.cluster.NodeId; ...@@ -32,7 +32,6 @@ import org.onosproject.cluster.NodeId;
32 import org.onosproject.event.AbstractEvent; 32 import org.onosproject.event.AbstractEvent;
33 import org.onosproject.store.Timestamp; 33 import org.onosproject.store.Timestamp;
34 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 34 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35 -import org.onosproject.store.cluster.messaging.ClusterMessage;
36 import org.onosproject.store.cluster.messaging.ClusterMessageHandler; 35 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
37 import org.onosproject.store.cluster.messaging.MessageSubject; 36 import org.onosproject.store.cluster.messaging.MessageSubject;
38 import org.onosproject.store.impl.LogicalTimestamp; 37 import org.onosproject.store.impl.LogicalTimestamp;
...@@ -44,11 +43,13 @@ import org.onosproject.store.service.EventuallyConsistentMapEvent; ...@@ -44,11 +43,13 @@ import org.onosproject.store.service.EventuallyConsistentMapEvent;
44 import org.onosproject.store.service.EventuallyConsistentMapListener; 43 import org.onosproject.store.service.EventuallyConsistentMapListener;
45 44
46 import java.util.ArrayList; 45 import java.util.ArrayList;
46 +import java.util.Collection;
47 import java.util.HashMap; 47 import java.util.HashMap;
48 import java.util.HashSet; 48 import java.util.HashSet;
49 import java.util.List; 49 import java.util.List;
50 import java.util.Map; 50 import java.util.Map;
51 import java.util.Objects; 51 import java.util.Objects;
52 +import java.util.Optional;
52 import java.util.Set; 53 import java.util.Set;
53 import java.util.concurrent.CompletableFuture; 54 import java.util.concurrent.CompletableFuture;
54 import java.util.concurrent.CountDownLatch; 55 import java.util.concurrent.CountDownLatch;
...@@ -89,8 +90,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -89,8 +90,8 @@ public class EventuallyConsistentMapImplTest {
89 private final ControllerNode self = 90 private final ControllerNode self =
90 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1)); 91 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
91 92
92 - private ClusterMessageHandler updateHandler; 93 + private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
93 - private ClusterMessageHandler antiEntropyHandler; 94 + private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler;
94 95
95 /* 96 /*
96 * Serialization is a bit tricky here. We need to serialize in the tests 97 * Serialization is a bit tricky here. We need to serialize in the tests
...@@ -109,11 +110,10 @@ public class EventuallyConsistentMapImplTest { ...@@ -109,11 +110,10 @@ public class EventuallyConsistentMapImplTest {
109 // Below is the classes that the map internally registers 110 // Below is the classes that the map internally registers
110 .register(LogicalTimestamp.class) 111 .register(LogicalTimestamp.class)
111 .register(WallClockTimestamp.class) 112 .register(WallClockTimestamp.class)
112 - .register(PutEntry.class)
113 - .register(RemoveEntry.class)
114 .register(ArrayList.class) 113 .register(ArrayList.class)
115 .register(AntiEntropyAdvertisement.class) 114 .register(AntiEntropyAdvertisement.class)
116 .register(HashMap.class) 115 .register(HashMap.class)
116 + .register(Optional.class)
117 .build(); 117 .build();
118 } 118 }
119 }; 119 };
...@@ -131,9 +131,9 @@ public class EventuallyConsistentMapImplTest { ...@@ -131,9 +131,9 @@ public class EventuallyConsistentMapImplTest {
131 // delegate to our ClusterCommunicationService implementation. This 131 // delegate to our ClusterCommunicationService implementation. This
132 // allows us to get a reference to the map's internal cluster message 132 // allows us to get a reference to the map's internal cluster message
133 // handlers so we can induce events coming in from a peer. 133 // handlers so we can induce events coming in from a peer.
134 - clusterCommunicator.addSubscriber(anyObject(MessageSubject.class), 134 + clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class),
135 - anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class)); 135 + anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
136 - expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3); 136 + expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2);
137 137
138 replay(clusterCommunicator); 138 replay(clusterCommunicator);
139 139
...@@ -237,15 +237,15 @@ public class EventuallyConsistentMapImplTest { ...@@ -237,15 +237,15 @@ public class EventuallyConsistentMapImplTest {
237 assertEquals(VALUE1, ecMap.get(KEY1)); 237 assertEquals(VALUE1, ecMap.get(KEY1));
238 238
239 // Remote put 239 // Remote put
240 - ClusterMessage message 240 + List<UpdateEntry<String, String>> message
241 - = generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)); 241 + = ImmutableList.of(generatePutMessage(KEY2, VALUE2, clockService.getTimestamp(KEY2, VALUE2)));
242 242
243 // Create a latch so we know when the put operation has finished 243 // Create a latch so we know when the put operation has finished
244 latch = new CountDownLatch(1); 244 latch = new CountDownLatch(1);
245 ecMap.addListener(new TestListener(latch)); 245 ecMap.addListener(new TestListener(latch));
246 246
247 assertNull(ecMap.get(KEY2)); 247 assertNull(ecMap.get(KEY2));
248 - updateHandler.handle(message); 248 + updateHandler.accept(message);
249 assertTrue("External listener never got notified of internal event", 249 assertTrue("External listener never got notified of internal event",
250 latch.await(100, TimeUnit.MILLISECONDS)); 250 latch.await(100, TimeUnit.MILLISECONDS));
251 assertEquals(VALUE2, ecMap.get(KEY2)); 251 assertEquals(VALUE2, ecMap.get(KEY2));
...@@ -255,14 +255,13 @@ public class EventuallyConsistentMapImplTest { ...@@ -255,14 +255,13 @@ public class EventuallyConsistentMapImplTest {
255 assertNull(ecMap.get(KEY2)); 255 assertNull(ecMap.get(KEY2));
256 256
257 // Remote remove 257 // Remote remove
258 - ClusterMessage removeMessage 258 + message = ImmutableList.of(generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1)));
259 - = generateRemoveMessage(KEY1, clockService.getTimestamp(KEY1, VALUE1));
260 259
261 // Create a latch so we know when the remove operation has finished 260 // Create a latch so we know when the remove operation has finished
262 latch = new CountDownLatch(1); 261 latch = new CountDownLatch(1);
263 ecMap.addListener(new TestListener(latch)); 262 ecMap.addListener(new TestListener(latch));
264 263
265 - updateHandler.handle(removeMessage); 264 + updateHandler.accept(message);
266 assertTrue("External listener never got notified of internal event", 265 assertTrue("External listener never got notified of internal event",
267 latch.await(100, TimeUnit.MILLISECONDS)); 266 latch.await(100, TimeUnit.MILLISECONDS));
268 assertNull(ecMap.get(KEY1)); 267 assertNull(ecMap.get(KEY1));
...@@ -601,49 +600,35 @@ public class EventuallyConsistentMapImplTest { ...@@ -601,49 +600,35 @@ public class EventuallyConsistentMapImplTest {
601 } 600 }
602 } 601 }
603 602
604 - private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) { 603 + private UpdateEntry<String, String> generatePutMessage(String key, String value, Timestamp timestamp) {
605 - PutEntry<String, String> event = new PutEntry<>(key, value, timestamp); 604 + return new UpdateEntry<>(key, new MapValue<>(value, timestamp));
606 -
607 - return new ClusterMessage(
608 - clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
609 - SERIALIZER.encode(Lists.newArrayList(event)));
610 } 605 }
611 606
612 - private List<PutEntry<String, String>> generatePutMessage( 607 + private List<UpdateEntry<String, String>> generatePutMessage(
613 String key1, String value1, String key2, String value2) { 608 String key1, String value1, String key2, String value2) {
614 - ArrayList<PutEntry<String, String>> list = new ArrayList<>(); 609 + List<UpdateEntry<String, String>> list = new ArrayList<>();
615 610
616 Timestamp timestamp1 = clockService.peek(1); 611 Timestamp timestamp1 = clockService.peek(1);
617 Timestamp timestamp2 = clockService.peek(2); 612 Timestamp timestamp2 = clockService.peek(2);
618 613
619 - PutEntry<String, String> pe1 = new PutEntry<>(key1, value1, timestamp1); 614 + list.add(generatePutMessage(key1, value1, timestamp1));
620 - PutEntry<String, String> pe2 = new PutEntry<>(key2, value2, timestamp2); 615 + list.add(generatePutMessage(key2, value2, timestamp2));
621 -
622 - list.add(pe1);
623 - list.add(pe2);
624 616
625 return list; 617 return list;
626 } 618 }
627 619
628 - private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) { 620 + private UpdateEntry<String, String> generateRemoveMessage(String key, Timestamp timestamp) {
629 - RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp); 621 + return new UpdateEntry<>(key, new MapValue<>(null, timestamp));
630 -
631 - return new ClusterMessage(
632 - clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
633 - SERIALIZER.encode(Lists.newArrayList(event)));
634 } 622 }
635 623
636 - private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) { 624 + private List<UpdateEntry<String, String>> generateRemoveMessage(String key1, String key2) {
637 - ArrayList<RemoveEntry<String, String>> list = new ArrayList<>(); 625 + List<UpdateEntry<String, String>> list = new ArrayList<>();
638 626
639 Timestamp timestamp1 = clockService.peek(1); 627 Timestamp timestamp1 = clockService.peek(1);
640 Timestamp timestamp2 = clockService.peek(2); 628 Timestamp timestamp2 = clockService.peek(2);
641 629
642 - RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1); 630 + list.add(generateRemoveMessage(key1, timestamp1));
643 - RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2); 631 + list.add(generateRemoveMessage(key2, timestamp2));
644 -
645 - list.add(re1);
646 - list.add(re2);
647 632
648 return list; 633 return list;
649 } 634 }
...@@ -737,13 +722,6 @@ public class EventuallyConsistentMapImplTest { ...@@ -737,13 +722,6 @@ public class EventuallyConsistentMapImplTest {
737 public void addSubscriber(MessageSubject subject, 722 public void addSubscriber(MessageSubject subject,
738 ClusterMessageHandler subscriber, 723 ClusterMessageHandler subscriber,
739 ExecutorService executor) { 724 ExecutorService executor) {
740 - if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
741 - updateHandler = subscriber;
742 - } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
743 - antiEntropyHandler = subscriber;
744 - } else {
745 - throw new RuntimeException("Unexpected message subject " + subject.toString());
746 - }
747 } 725 }
748 726
749 @Override 727 @Override
...@@ -793,6 +771,13 @@ public class EventuallyConsistentMapImplTest { ...@@ -793,6 +771,13 @@ public class EventuallyConsistentMapImplTest {
793 public <M> void addSubscriber(MessageSubject subject, 771 public <M> void addSubscriber(MessageSubject subject,
794 Function<byte[], M> decoder, Consumer<M> handler, 772 Function<byte[], M> decoder, Consumer<M> handler,
795 Executor executor) { 773 Executor executor) {
774 + if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
775 + updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
776 + } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
777 + antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler;
778 + } else {
779 + throw new RuntimeException("Unexpected message subject " + subject.toString());
780 + }
796 } 781 }
797 } 782 }
798 783
......