Madan Jampani
Committed by Gerrit Code Review

Support null timestamps during EC Map remove

Change-Id: I250cc08d6b2570fd9febe5fc50ab0556bedfa410
......@@ -330,10 +330,13 @@ public class EventuallyConsistentMapImpl<K, V>
}
private V removeAndNotify(K key, V value) {
MapValue<V> tombstone = MapValue.tombstone(timestampProvider.apply(key, value));
Timestamp timestamp = timestampProvider.apply(key, value);
Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
if (previousValue != null) {
notifyPeers(new UpdateEntry<>(key, tombstone), peerUpdateFunction.apply(key, previousValue.get()));
notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
peerUpdateFunction.apply(key, previousValue.get()));
if (previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
......@@ -341,11 +344,11 @@ public class EventuallyConsistentMapImpl<K, V>
return previousValue != null ? previousValue.get() : null;
}
private MapValue<V> removeInternal(K key, Optional<V> value, MapValue<V> tombstone) {
private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
checkState(tombstone.isTombstone());
tombstone.ifPresent(v -> checkState(v.isTombstone()));
counter.incrementCount();
AtomicBoolean updated = new AtomicBoolean(false);
......@@ -358,22 +361,26 @@ public class EventuallyConsistentMapImpl<K, V>
if (existing == null) {
log.debug("ECMap Remove: Existing value for key {} is already null", k);
}
updated.set(valueMatches && (existing == null || tombstone.isNewerThan(existing)));
if (updated.get()) {
previousValue.set(existing);
if (valueMatches) {
if (existing == null) {
updated.set(tombstone.isPresent());
} else {
updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
}
}
if (updated.get()) {
return tombstonesDisabled ? null : tombstone;
previousValue.set(existing);
return tombstone.orElse(null);
} else {
return existing;
}
});
if (updated.get()) {
if (persistent) {
if (tombstonesDisabled) {
persistentStore.remove(key);
if (tombstone.isPresent()) {
persistentStore.update(key, tombstone.get());
} else {
persistentStore.update(key, tombstone);
persistentStore.remove(key);
}
}
}
......@@ -605,9 +612,10 @@ public class EventuallyConsistentMapImpl<K, V>
if (remoteValueDigest != null
&& remoteValueDigest.isNewerThan(localValue.digest())
&& remoteValueDigest.isTombstone()) {
MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
MapValue.tombstone(remoteValueDigest.timestamp()));
Optional.of(tombstone));
if (previousValue != null && previousValue.isAlive()) {
externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
......@@ -623,8 +631,8 @@ public class EventuallyConsistentMapImpl<K, V>
updates.forEach(update -> {
final K key = update.key();
final MapValue<V> value = update.value();
if (value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), value);
if (value == null || value.isTombstone()) {
MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
if (previousValue != null && previousValue.isAlive()) {
notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
......
......@@ -34,7 +34,7 @@ final class UpdateEntry<K, V> {
*/
public UpdateEntry(K key, MapValue<V> value) {
this.key = checkNotNull(key);
this.value = checkNotNull(value);
this.value = value;
}
/**
......