Madan Jampani
Committed by Gerrit Code Review

Misc fixes/improvments to ECMapImpl. Most notably:

- Fixed logic in determining random peer to do AE
- Fixed for logic for when to do active sync if lightWeightAE is disabled
- Fixed tracking of ECMap activity

Change-Id: I35da91d6ef684e16630be7bd5e518c8400debe14
......@@ -15,7 +15,9 @@
*/
package org.onosproject.store.ecmap;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -97,6 +99,8 @@ public class EventuallyConsistentMapImpl<K, V>
private final ExecutorService communicationExecutor;
private final Map<NodeId, EventAccumulator> senderPending;
private final String mapName;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
private final String destroyedMessage;
......@@ -157,6 +161,7 @@ public class EventuallyConsistentMapImpl<K, V>
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster,
boolean persistent) {
this.mapName = mapName;
items = Maps.newConcurrentMap();
senderPending = Maps.newConcurrentMap();
destroyedMessage = mapName + ERROR_DESTROYED;
......@@ -284,7 +289,7 @@ public class EventuallyConsistentMapImpl<K, V>
return items.values()
.stream()
.filter(MapValue::isAlive)
.anyMatch(v -> v.get().equals(value));
.anyMatch(v -> value.equals(v.get()));
}
@Override
......@@ -303,7 +308,7 @@ public class EventuallyConsistentMapImpl<K, V>
checkNotNull(value, ERROR_NULL_VALUE);
MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
if (updateInternal(key, newValue)) {
if (putInternal(key, newValue)) {
notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value));
}
......@@ -313,16 +318,7 @@ public class EventuallyConsistentMapImpl<K, V>
public V remove(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
// TODO prevent calls here if value is important for timestamp
MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, null));
MapValue<V> previousValue = removeInternal(key, Optional.empty(), tombstone);
if (previousValue != null) {
notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
if (previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
}
}
return previousValue != null ? previousValue.get() : null;
return removeAndNotify(key, null);
}
@Override
......@@ -330,22 +326,28 @@ public class EventuallyConsistentMapImpl<K, V>
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
removeAndNotify(key, value);
}
private V removeAndNotify(K key, V value) {
MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
MapValue<V> previousValue = removeInternal(key, Optional.of(value), tombstone);
MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
if (previousValue != null) {
notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
if (previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
}
}
return previousValue != null ? previousValue.get() : null;
}
private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
checkState(tombstone.isTombstone());
counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
items.compute(key, (k, existing) -> {
......@@ -360,13 +362,21 @@ public class EventuallyConsistentMapImpl<K, V>
if (updated.get()) {
previousValue.set(existing);
}
return updated.get() ? tombstone : existing;
if (updated.get()) {
return tombstonesDisabled ? null : tombstone;
} else {
return existing;
}
});
if (updated.get()) {
if (persistent) {
if (tombstonesDisabled) {
persistentStore.remove(key);
} else {
persistentStore.update(key, tombstone);
}
}
}
return previousValue.get();
}
......@@ -393,11 +403,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public Collection<V> values() {
checkState(!destroyed, destroyedMessage);
return Maps.filterValues(items, MapValue::isAlive)
.values()
.stream()
.map(MapValue::get)
.collect(Collectors.toList());
return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
}
@Override
......@@ -416,14 +422,16 @@ public class EventuallyConsistentMapImpl<K, V>
* @param newValue proposed new value
* @return true if update happened; false if map already contains a more recent value for the key
*/
private boolean updateInternal(K key, MapValue<V> newValue) {
private boolean putInternal(K key, MapValue<V> newValue) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
checkState(newValue.isAlive());
counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
items.compute(key, (k, existing) -> {
if (existing == null || newValue.isNewerThan(existing)) {
updated.set(true);
if (newValue.isTombstone()) {
return tombstonesDisabled ? null : newValue;
}
return newValue;
}
return existing;
......@@ -499,8 +507,8 @@ public class EventuallyConsistentMapImpl<K, V>
private Optional<NodeId> pickRandomActivePeer() {
List<NodeId> activePeers = clusterService.getNodes()
.stream()
.filter(node -> !localNodeId.equals(node))
.map(ControllerNode::id)
.filter(id -> !localNodeId.equals(id))
.filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
.collect(Collectors.toList());
Collections.shuffle(activePeers);
......@@ -519,9 +527,9 @@ public class EventuallyConsistentMapImpl<K, V>
});
}
private AntiEntropyAdvertisement<K> createAdvertisement() {
return new AntiEntropyAdvertisement<K>(localNodeId, Maps.transformValues(items, MapValue::digest));
return new AntiEntropyAdvertisement<K>(localNodeId,
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
}
private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
......@@ -529,13 +537,14 @@ public class EventuallyConsistentMapImpl<K, V>
return;
}
try {
log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
mapName, ad.sender(), ad.digest().size());
antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
if (!lightweightAntiEntropy) {
Set<K> missingKeys = Sets.difference(items.keySet(), ad.digest().keySet());
// if remote ad has something unknown, actively sync
if (missingKeys.size() > 0) {
// Send the advertisement back if this peer is out-of-sync
// if remote ad has any entries that the local copy is missing, actively sync
// TODO: Missing keys is not the way local copy can be behind.
if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
// TODO: Send ad for missing keys and for entries that are stale
sendAdvertisementToPeer(ad.sender());
}
......@@ -561,7 +570,9 @@ public class EventuallyConsistentMapImpl<K, V>
// local value is more recent, push to sender
queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
}
if (remoteValueDigest != null && remoteValueDigest.isTombstone()) {
if (remoteValueDigest != null
&& remoteValueDigest.isNewerThan(localValue.digest())
&& remoteValueDigest.isTombstone()) {
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
MapValue.tombstone(remoteValueDigest.timestamp()));
......@@ -582,10 +593,10 @@ public class EventuallyConsistentMapImpl<K, V>
final MapValue<V> value = update.value();
if (value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
if (previousValue != null && previousValue.get() != null) {
if (previousValue != null && previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(REMOVE, key, previousValue.get()));
}
} else if (updateInternal(key, value)) {
} else if (putInternal(key, value)) {
notifyListeners(new EventuallyConsistentMapEvent<>(PUT, key, value.get()));
}
});
......
......@@ -75,6 +75,11 @@ class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
executor.submit(() -> updateInternal(key, value));
}
@Override
public void remove(K key) {
executor.submit(() -> removeInternal(key));
}
private void updateInternal(K key, MapValue<V> newValue) {
byte[] keyBytes = serializer.encode(key);
......@@ -89,4 +94,10 @@ class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
});
database.commit();
}
private void removeInternal(K key) {
byte[] keyBytes = serializer.encode(key);
items.remove(keyBytes);
database.commit();
}
}
......
......@@ -37,4 +37,11 @@ interface PersistentStore<K, V> {
* @param value the value
*/
void update(K key, MapValue<V> value);
/**
* Removes a key from persistent store.
*
* @param key the key to remove
*/
void remove(K key);
}
......