Madan Jampani
Committed by Gerrit Code Review

Updated ECMap remove call to return the value that was removed

Change-Id: Id7eacc04f4bb9322e4f98da5664c2fa46e0ea6fc
......@@ -99,16 +99,14 @@ public interface EventuallyConsistentMap<K, V> {
/**
* Removes the mapping associated with the specified key from the map.
* <p>
* Note: this differs from the specification of {@link java.util.Map}
* because it does not return the previous value associated with the key.
* Clients are expected to register an
* {@link EventuallyConsistentMapListener} if
* Clients are expected to register an {@link EventuallyConsistentMapListener} if
* they are interested in receiving notification of updates to the map.
* </p>
*
* @param key the key to remove the mapping for
* @return previous value associated with key, or null if there was no mapping for key.
*/
void remove(K key);
V remove(K key);
/**
* Removes the given key-value mapping from the map, if it exists.
......
......@@ -49,6 +49,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
......@@ -58,11 +59,14 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
......@@ -347,42 +351,54 @@ public class EventuallyConsistentMapImpl<K, V>
}
@Override
public void remove(K key) {
public V remove(K key) {
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
// TODO prevent calls here if value is important for timestamp
Timestamp timestamp = timestampProvider.apply(key, null);
if (removeInternal(key, timestamp)) {
notifyPeers(new RemoveEntry<>(key, timestamp),
peerUpdateFunction.apply(key, null));
notifyListeners(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null));
Optional<V> removedValue = removeInternal(key, timestamp);
if (removedValue == null) {
return null;
}
notifyPeers(new RemoveEntry<>(key, timestamp),
peerUpdateFunction.apply(key, null));
notifyListeners(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, removedValue.orElse(null)));
return removedValue.orElse(null);
}
private boolean removeInternal(K key, Timestamp timestamp) {
/**
* Returns null if the timestamp is for a outdated request i.e.
* the value is the map is more recent or a tombstone exists with a
* more recent timestamp.
* Returns non-empty optional if a value was indeed removed from the map.
* Returns empty optional if map did not contain a value for the key but the existing
* tombstone is older than this timestamp.
* @param key key
* @param timestamp timestamp for remove request
* @return Optional value.
*/
private Optional<V> removeInternal(K key, Timestamp timestamp) {
if (timestamp == null) {
return false;
return null;
}
counter.incrementCount();
final MutableBoolean updated = new MutableBoolean(false);
final AtomicReference<Optional<V>> removedValue = new AtomicReference<>(null);
items.compute(key, (k, existing) -> {
if (existing != null && existing.isNewerThan(timestamp)) {
updated.setFalse();
return existing;
} else {
updated.setTrue();
// remove from items map
removedValue.set(existing == null ? Optional.empty() : Optional.of(existing.value()));
return null;
}
});
if (updated.isFalse()) {
return false;
if (isNull(removedValue.get())) {
return null;
}
boolean updatedTombstone = false;
......@@ -397,11 +413,14 @@ public class EventuallyConsistentMapImpl<K, V>
}
}
if (updated.booleanValue() && persistent) {
if (persistent) {
persistentStore.remove(key, timestamp);
}
return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
if (tombstonesDisabled || updatedTombstone) {
return removedValue.get();
}
return null;
}
@Override
......@@ -412,7 +431,7 @@ public class EventuallyConsistentMapImpl<K, V>
Timestamp timestamp = timestampProvider.apply(key, value);
if (removeInternal(key, timestamp)) {
if (nonNull(removeInternal(key, timestamp))) {
notifyPeers(new RemoveEntry<>(key, timestamp),
peerUpdateFunction.apply(key, value));
notifyListeners(new EventuallyConsistentMapEvent<>(
......@@ -641,7 +660,7 @@ public class EventuallyConsistentMapImpl<K, V>
if (remoteDeadTimestamp != null &&
remoteDeadTimestamp.isNewerThan(localValue.timestamp())) {
// sender has a more recent remove
if (removeInternal(key, remoteDeadTimestamp)) {
if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
externalEvents.add(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
......@@ -697,7 +716,7 @@ public class EventuallyConsistentMapImpl<K, V>
local.timestamp())) {
// If the remote has a more recent tombstone than either our local
// value, then do a remove with their timestamp
if (removeInternal(key, remoteDeadTimestamp)) {
if (nonNull(removeInternal(key, remoteDeadTimestamp))) {
externalEvents.add(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
......@@ -744,7 +763,7 @@ public class EventuallyConsistentMapImpl<K, V>
// TODO clean this for loop up
for (AbstractEntry<K, V> entry : events) {
final K key = entry.key();
final V value;
V value;
final Timestamp timestamp = entry.timestamp();
final EventuallyConsistentMapEvent.Type type;
if (entry instanceof PutEntry) {
......@@ -764,7 +783,11 @@ public class EventuallyConsistentMapImpl<K, V>
success = putInternal(key, value, timestamp);
break;
case REMOVE:
success = removeInternal(key, timestamp);
Optional<V> removedValue = removeInternal(key, timestamp);
success = removedValue != null;
if (success) {
value = removedValue.orElse(null);
}
break;
default:
success = false;
......
......@@ -326,8 +326,9 @@ public class EventuallyConsistentMapImplTest {
EventuallyConsistentMapListener<String, String> listener
= getListener();
listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
expectLastCall().times(2);
listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>(
......@@ -425,9 +426,9 @@ public class EventuallyConsistentMapImplTest {
EventuallyConsistentMapListener<String, String> listener
= getListener();
listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, KEY1, null));
EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1));
listener.event(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, KEY2, null));
EventuallyConsistentMapEvent.Type.REMOVE, KEY2, VALUE2));
replay(listener);
// clear() on an empty map is a no-op - no messages will be sent
......