Madan Jampani
Committed by Gerrit Code Review

Added DistributedPrimitive interface

Added AsyncDistributedSet that provides async set operations

Change-Id: I83494075a7973694ea6b7445ff4799b7a1a50641
Showing 30 changed files with 602 additions and 171 deletions
......@@ -27,6 +27,12 @@ import org.onosproject.store.service.EventuallyConsistentMapListener;
* Testing adapter for EventuallyConsistentMap.
*/
public class VtnEventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
@Override
public String name() {
return null;
}
@Override
public int size() {
return 0;
......
......@@ -20,7 +20,12 @@ import java.util.concurrent.CompletableFuture;
/**
* An async atomic counter dispenses monotonically increasing values.
*/
public interface AsyncAtomicCounter {
public interface AsyncAtomicCounter extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.COUNTER;
}
/**
* Atomically increment by one the current value.
......
......@@ -27,7 +27,12 @@ import java.util.concurrent.CompletableFuture;
*
* @param <V> value type
*/
public interface AsyncAtomicValue<V> {
public interface AsyncAtomicValue<V> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.VALUE;
}
/**
* Atomically sets the value to the given updated value if the current value is equal to the expected value.
......
......@@ -49,7 +49,12 @@ import java.util.function.Predicate;
* the returned future will be {@link CompletableFuture#complete completed} when the
* operation finishes.
*/
public interface AsyncConsistentMap<K, V> {
public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.CONSISTENT_MAP;
}
/**
* Returns the number of entries in the map.
......
/*
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* A distributed collection designed for holding unique elements.
* <p>
* All methods of {@code AsyncDistributedSet} immediately return a {@link CompletableFuture future}.
* The returned future will be {@link CompletableFuture#complete completed} when the operation
* completes.
*
* @param <E> set entry type
*/
public interface AsyncDistributedSet<E> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.SET;
}
/**
* Registers the specified listener to be notified whenever
* the set is updated.
*
* @param listener listener to notify about set update events
*/
CompletableFuture<Void> addListener(SetEventListener<E> listener);
/**
* Unregisters the specified listener.
*
* @param listener listener to unregister.
* @return CompletableFuture that is completed when the operation completes
*/
CompletableFuture<Void> removeListener(SetEventListener<E> listener);
/**
* Adds the specified element to this set if it is not already present (optional operation).
* @param element element to add
* @return {@code true} if this set did not already contain the specified element.
*/
CompletableFuture<Boolean> add(E element);
/**
* Removes the specified element to this set if it is present (optional operation).
* @param element element to remove
* @return {@code true} if this set contained the specified element
*/
CompletableFuture<Boolean> remove(E element);
/**
* Returns the number of elements in the set.
* @return size of the set
*/
CompletableFuture<Integer> size();
/**
* Returns if the set is empty.
* @return {@code true} if this set is empty
*/
CompletableFuture<Boolean> isEmpty();
/**
* Removes all elements from the set.
*/
CompletableFuture<Void> clear();
/**
* Returns if this set contains the specified element.
* @param element element to check
* @return {@code true} if this set contains the specified element
*/
CompletableFuture<Boolean> contains(E element);
/**
* Adds all of the elements in the specified collection to this set if they're not
* already present (optional operation).
* @param c collection containing elements to be added to this set
* @return {@code true} if this set contains all elements in the collection
*/
CompletableFuture<Boolean> addAll(Collection<? extends E> c);
/**
* Returns if this set contains all the elements in specified collection.
* @param c collection
* @return {@code true} if this set contains all elements in the collection
*/
CompletableFuture<Boolean> containsAll(Collection<? extends E> c);
/**
* Retains only the elements in this set that are contained in the specified collection (optional operation).
* @param c collection containing elements to be retained in this set
* @return {@code true} if this set changed as a result of the call
*/
CompletableFuture<Boolean> retainAll(Collection<? extends E> c);
/**
* Removes from this set all of its elements that are contained in the specified collection (optional operation).
* If the specified collection is also a set, this operation effectively modifies this set so that its
* value is the asymmetric set difference of the two sets.
* @param c collection containing elements to be removed from this set
* @return {@code true} if this set changed as a result of the call
*/
CompletableFuture<Boolean> removeAll(Collection<? extends E> c);
/**
* Returns the entries as a immutable set. The returned set is a snapshot and will not reflect new changes made to
* this AsyncDistributedSet
* @return immutable set copy
*/
CompletableFuture<? extends Set<E>> getAsImmutableSet();
}
......@@ -18,7 +18,12 @@ package org.onosproject.store.service;
/**
* Distributed version of java.util.concurrent.atomic.AtomicLong.
*/
public interface AtomicCounter {
public interface AtomicCounter extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.COUNTER;
}
/**
* Atomically increment by one the current value.
......
......@@ -20,7 +20,7 @@ package org.onosproject.store.service;
*
* @param <V> value type
*/
public interface AtomicValue<V> {
public interface AtomicValue<V> extends DistributedPrimitive {
/**
* Atomically sets the value to the given updated value if the current value is equal to the expected value.
......
......@@ -44,7 +44,7 @@ import java.util.function.Predicate;
* (which extends RuntimeException) to indicate failures.
*
*/
public interface ConsistentMap<K, V> {
public interface ConsistentMap<K, V> extends DistributedPrimitive {
/**
* Returns the number of entries in the map.
......
/*
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onosproject.core.ApplicationId;
/**
* Interface for all distributed primitives.
*/
public interface DistributedPrimitive {
/**
* Type of distributed primitive.
*/
public enum Type {
/**
* Map with strong consistency semantics.
*/
CONSISTENT_MAP,
/**
* Map with eventual consistency semantics.
*/
EVENTUALLY_CONSISTENT_MAP,
/**
* distributed set.
*/
SET,
/**
* atomic counter.
*/
COUNTER,
/**
* Atomic value.
*/
VALUE,
/**
* Distributed queue.
*/
QUEUE
}
/**
* Returns the name of this primitive.
* @return name
*/
String name();
/**
* Returns the type of primitive.
* @return primitive type
*/
Type type();
/**
* Returns the application owning this primitive.
*/
default ApplicationId applicationId() {
return null;
}
}
......@@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture;
*
* @param <E> queue entry type
*/
public interface DistributedQueue<E> {
public interface DistributedQueue<E> extends DistributedPrimitive {
/**
* Returns total number of entries in the queue.
......
......@@ -22,7 +22,7 @@ import java.util.Set;
*
* @param <E> set entry type
*/
public interface DistributedSet<E> extends Set<E> {
public interface DistributedSet<E> extends Set<E>, DistributedPrimitive {
/**
* Registers the specified listener to be notified whenever
......
......@@ -129,4 +129,13 @@ public interface DistributedSetBuilder<E> {
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
DistributedSet<E> build();
/**
* Builds an {@link AsyncDistributedSet async set} based on the configuration options
* supplied to this builder.
*
* @return new AsyncDistributedSet
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AsyncDistributedSet<E> buildAsyncSet();
}
......
......@@ -39,7 +39,12 @@ import java.util.function.BiFunction;
* Null values are not allowed in this map.
* </p>
*/
public interface EventuallyConsistentMap<K, V> {
public interface EventuallyConsistentMap<K, V> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.EVENTUALLY_CONSISTENT_MAP;
}
/**
* Returns the number of key-value mappings in this map.
......
/*
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* DistributedPrimitive that is a synchronous (blocking) version of
* another.
*
* @param <T> type of DistributedPrimitive
*/
public abstract class Synchronous<T extends DistributedPrimitive> implements DistributedPrimitive {
private final T primitive;
public Synchronous(T primitive) {
this.primitive = primitive;
}
@Override
public String name() {
return primitive.name();
}
@Override
public Type type() {
return primitive.type();
}
}
......@@ -26,6 +26,17 @@ import java.util.function.Predicate;
* Testing adapter for the consistent map.
*/
public class ConsistentMapAdapter<K, V> implements ConsistentMap<K, V> {
@Override
public String name() {
return null;
}
@Override
public DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.CONSISTENT_MAP;
}
@Override
public int size() {
return 0;
......
......@@ -20,10 +20,23 @@ import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import org.onosproject.store.service.DistributedPrimitive.Type;
/**
* Testing adapter for EventuallyConsistentMap.
*/
public class EventuallyConsistentMapAdapter<K, V> implements EventuallyConsistentMap<K, V> {
@Override
public String name() {
return null;
}
@Override
public Type type() {
return Type.EVENTUALLY_CONSISTENT_MAP;
}
@Override
public int size() {
return 0;
......
......@@ -23,6 +23,16 @@ import java.util.concurrent.atomic.AtomicLong;
public final class TestAtomicCounter implements AtomicCounter {
final AtomicLong value;
@Override
public String name() {
return null;
}
@Override
public Type type() {
return Type.COUNTER;
}
private TestAtomicCounter() {
value = new AtomicLong();
}
......
......@@ -28,7 +28,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.onosproject.core.ApplicationId;
import static org.onosproject.store.service.MapEvent.Type;
import static org.onosproject.store.service.MapEvent.Type.*;
/**
......@@ -37,7 +37,7 @@ import static org.onosproject.store.service.MapEvent.Type.*;
public final class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
private final List<MapEventListener<K, V>> listeners;
private final HashMap<K, V> map;
private final Map<K, V> map;
private final String mapName;
private final AtomicLong counter = new AtomicLong(0);
......@@ -54,7 +54,7 @@ public final class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
/**
* Notify all listeners of an event.
*/
private void notifyListeners(String mapName, Type type,
private void notifyListeners(String mapName, MapEvent.Type type,
K key, Versioned<V> value) {
MapEvent<K, V> event = new MapEvent<>(mapName, type, key, value);
listeners.forEach(
......
......@@ -51,6 +51,11 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Long> incrementAndGet() {
final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
return addAndGet(1L)
......
......@@ -55,6 +55,11 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
CompletableFuture<Boolean> response;
......
......@@ -171,6 +171,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
* Returns this map name.
* @return map name
*/
@Override
public String name() {
return name;
}
......@@ -187,6 +188,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
* Returns the applicationId owning this map.
* @return application Id
*/
@Override
public ApplicationId applicationId() {
return applicationId;
}
......
/*
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Implementation of {@link AsyncDistributedSet}.
*
* @param <E> set entry type
*/
public class DefaultAsyncDistributedSet<E> implements AsyncDistributedSet<E> {
private static final String CONTAINS = "contains";
private static final String PRIMITIVE_NAME = "distributedSet";
private static final String SIZE = "size";
private static final String IS_EMPTY = "isEmpty";
private static final String ADD = "add";
private static final String REMOVE = "remove";
private static final String CONTAINS_ALL = "containsAll";
private static final String ADD_ALL = "addAll";
private static final String RETAIN_ALL = "retainAll";
private static final String REMOVE_ALL = "removeAll";
private static final String CLEAR = "clear";
private static final String GET_AS_IMMUTABLE_SET = "getAsImmutableSet";
private final String name;
private final AsyncConsistentMap<E, Boolean> backingMap;
private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
private final MeteringAgent monitor;
public DefaultAsyncDistributedSet(AsyncConsistentMap<E, Boolean> backingMap, String name, boolean meteringEnabled) {
this.backingMap = backingMap;
this.name = name;
monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Integer> size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
return backingMap.size().whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> isEmpty() {
final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
return backingMap.isEmpty().whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> contains(E element) {
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
return backingMap.containsKey(element).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> add(E entry) {
final MeteringAgent.Context timer = monitor.startTimer(ADD);
return backingMap.putIfAbsent(entry, true).thenApply(Objects::isNull).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> remove(E entry) {
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return backingMap.remove(entry, true).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> containsAll(Collection<? extends E> c) {
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
return Tools.allOf(c.stream().map(this::contains).collect(Collectors.toList())).thenApply(v ->
v.stream().reduce(Boolean::logicalAnd).orElse(true)).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> addAll(Collection<? extends E> c) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
return Tools.allOf(c.stream().map(this::add).collect(Collectors.toList())).thenApply(v ->
v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> retainAll(Collection<? extends E> c) {
final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
return backingMap.keySet().thenApply(set -> Sets.difference(set, Sets.newHashSet(c)))
.thenCompose(this::removeAll)
.whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Boolean> removeAll(Collection<? extends E> c) {
final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
return Tools.allOf(c.stream().map(this::remove).collect(Collectors.toList())).thenApply(v ->
v.stream().reduce(Boolean::logicalOr).orElse(false)).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Void> clear() {
final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
return backingMap.clear().whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<? extends Set<E>> getAsImmutableSet() {
final MeteringAgent.Context timer = monitor.startTimer(GET_AS_IMMUTABLE_SET);
return backingMap.keySet().thenApply(s -> ImmutableSet.copyOf(s)).whenComplete((r, e) -> timer.stop(null));
}
@Override
public CompletableFuture<Void> addListener(SetEventListener<E> listener) {
MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
if (mapEvent.type() == MapEvent.Type.INSERT) {
listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
} else if (mapEvent.type() == MapEvent.Type.REMOVE) {
listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
}
};
if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
return backingMap.addListener(mapEventListener);
}
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> removeListener(SetEventListener<E> listener) {
MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
if (mapEventListener != null) {
return backingMap.removeListener(mapEventListener);
}
return CompletableFuture.completedFuture(null);
}
}
......@@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Synchronous;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -30,16 +31,15 @@ import java.util.concurrent.TimeoutException;
* <p>
* The initial value will be zero.
*/
public class DefaultAtomicCounter implements AtomicCounter {
public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicCounter asyncCounter;
public DefaultAtomicCounter(String name,
Database database,
boolean meteringEnabled) {
asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter) {
super(asyncCounter);
this.asyncCounter = asyncCounter;
}
@Override
......
......@@ -53,9 +53,7 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
@Override
public AtomicCounter build() {
validateInputs();
Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
return new DefaultAtomicCounter(name, database, metering);
return new DefaultAtomicCounter(buildAsyncCounter());
}
@Override
......
......@@ -17,10 +17,12 @@ package org.onosproject.store.consistent.impl;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Synchronous;
import com.google.common.util.concurrent.Futures;
......@@ -29,12 +31,13 @@ import com.google.common.util.concurrent.Futures;
*
* @param <V> value type
*/
public class DefaultAtomicValue<V> implements AtomicValue<V> {
public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicValue<V> asyncValue;
public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
super(asyncValue);
this.asyncValue = asyncValue;
}
......
......@@ -28,9 +28,11 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.Set;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
/**
......@@ -40,18 +42,15 @@ import org.onosproject.store.service.Versioned;
* @param <K> type of key.
* @param <V> type of value.
*/
public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final DefaultAsyncConsistentMap<K, V> asyncMap;
private Map<K, V> javaMap;
public String name() {
return asyncMap.name();
}
public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
super(asyncMap);
this.asyncMap = asyncMap;
}
......
......@@ -19,12 +19,14 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.onlab.util.SharedExecutors;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
......@@ -108,10 +110,16 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
.whenComplete((r, e) -> timer.stop(e)));
}
@Override
public String name() {
return name;
}
@Override
public DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.QUEUE;
}
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
......
......@@ -15,227 +15,138 @@
*/
package org.onosproject.store.consistent.impl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
import org.onosproject.store.service.Synchronous;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Implementation of distributed set that is backed by a ConsistentMap.
* Implementation of {@link DistributedSet} that merely delegates to a {@link AsyncDistributedSet}
* and waits for the operation to complete.
* @param <E> set element type
*/
public class DefaultDistributedSet<E> implements DistributedSet<E> {
private static final String CONTAINS = "contains";
private static final String PRIMITIVE_NAME = "distributedSet";
private static final String SIZE = "size";
private static final String IS_EMPTY = "isEmpty";
private static final String ITERATOR = "iterator";
private static final String TO_ARRAY = "toArray";
private static final String ADD = "add";
private static final String REMOVE = "remove";
private static final String CONTAINS_ALL = "containsAll";
private static final String ADD_ALL = "addAll";
private static final String RETAIN_ALL = "retainAll";
private static final String REMOVE_ALL = "removeAll";
private static final String CLEAR = "clear";
private final String name;
private final ConsistentMap<E, Boolean> backingMap;
private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
private final MeteringAgent monitor;
public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
this.name = name;
this.backingMap = backingMap;
monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>> implements DistributedSet<E> {
private static final long OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncDistributedSet<E> asyncSet;
public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet) {
super(asyncSet);
this.asyncSet = asyncSet;
}
@Override
public int size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
private static <T> T complete(CompletableFuture<T> future) {
try {
return backingMap.size();
} finally {
timer.stop(null);
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsistentMapException.Interrupted();
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof ConsistentMapException) {
throw (ConsistentMapException) e.getCause();
} else {
throw new ConsistentMapException(e.getCause());
}
}
}
@Override
public boolean isEmpty() {
final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
try {
return backingMap.isEmpty();
} finally {
timer.stop(null);
public int size() {
return complete(asyncSet.size());
}
@Override
public boolean isEmpty() {
return complete(asyncSet.isEmpty());
}
@SuppressWarnings("unchecked")
@Override
public boolean contains(Object o) {
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
try {
return backingMap.containsKey((E) o);
} finally {
timer.stop(null);
}
return complete(asyncSet.contains((E) o));
}
@Override
public Iterator<E> iterator() {
final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
//Do we have to measure this guy?
try {
return backingMap.keySet().iterator();
} finally {
timer.stop(null);
}
return complete(asyncSet.getAsImmutableSet()).iterator();
}
@Override
public Object[] toArray() {
final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
try {
return backingMap.keySet().stream().toArray();
} finally {
timer.stop(null);
}
return complete(asyncSet.getAsImmutableSet()).stream().toArray();
}
@SuppressWarnings("unchecked")
@Override
public <T> T[] toArray(T[] a) {
final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
try {
// TODO: Optimize this to only allocate a new array if the set size
// is larger than the array.length. If the set size is smaller than
// the array.length then copy the data into the array and set the
// last element in the array to be null.
final T[] resizedArray =
(T[]) Array.newInstance(a.getClass().getComponentType(), backingMap.keySet().size());
return (T[]) backingMap.keySet().toArray(resizedArray);
} finally {
timer.stop(null);
}
(T[]) Array.newInstance(a.getClass().getComponentType(), complete(asyncSet.getAsImmutableSet()).size());
return complete(asyncSet.getAsImmutableSet()).toArray(resizedArray);
}
@Override
public boolean add(E e) {
final MeteringAgent.Context timer = monitor.startTimer(ADD);
try {
return backingMap.putIfAbsent(e, true) == null;
} finally {
timer.stop(null);
}
return complete(asyncSet.add(e));
}
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
try {
return backingMap.remove((E) o) != null;
} finally {
timer.stop(null);
}
return complete(asyncSet.remove((E) o));
}
@SuppressWarnings("unchecked")
@Override
public boolean containsAll(Collection<?> c) {
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
try {
return c.stream()
.allMatch(this::contains);
} finally {
timer.stop(null);
}
return complete(asyncSet.containsAll((Collection<? extends E>) c));
}
@Override
public boolean addAll(Collection<? extends E> c) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
try {
return c.stream()
.map(this::add)
.reduce(Boolean::logicalOr)
.orElse(false);
} finally {
timer.stop(null);
}
return complete(asyncSet.addAll(c));
}
@SuppressWarnings("unchecked")
@Override
public boolean retainAll(Collection<?> c) {
final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
try {
Set<?> retainSet = Sets.newHashSet(c);
return backingMap.keySet()
.stream()
.filter(k -> !retainSet.contains(k))
.map(this::remove)
.reduce(Boolean::logicalOr)
.orElse(false);
} finally {
timer.stop(null);
}
return complete(asyncSet.retainAll((Collection<? extends E>) c));
}
@SuppressWarnings("unchecked")
@Override
public boolean removeAll(Collection<?> c) {
final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
try {
Set<?> removeSet = Sets.newHashSet(c);
return backingMap.keySet()
.stream()
.filter(removeSet::contains)
.map(this::remove)
.reduce(Boolean::logicalOr)
.orElse(false);
} finally {
timer.stop(null);
}
return complete(asyncSet.removeAll((Collection<? extends E>) c));
}
@Override
public void clear() {
final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
try {
backingMap.clear();
} finally {
timer.stop(null);
}
complete(asyncSet.clear());
}
@Override
public void addListener(SetEventListener<E> listener) {
MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
if (mapEvent.type() == MapEvent.Type.INSERT) {
listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
} else if (mapEvent.type() == MapEvent.Type.REMOVE) {
listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
}
};
if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
backingMap.addListener(mapEventListener);
}
complete(asyncSet.addListener(listener));
}
@Override
public void removeListener(SetEventListener<E> listener) {
MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
if (mapEventListener != null) {
backingMap.removeListener(mapEventListener);
}
complete(asyncSet.removeListener(listener));
}
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.consistent.impl;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
......@@ -88,6 +89,11 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E>
@Override
public DistributedSet<E> build() {
return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
return new DefaultDistributedSet<E>(buildAsyncSet());
}
@Override
public AsyncDistributedSet<E> buildAsyncSet() {
return new DefaultAsyncDistributedSet<E>(mapBuilder.buildAsyncMap(), name, metering);
}
}
......
......@@ -273,6 +273,11 @@ public class EventuallyConsistentMapImpl<K, V>
}
@Override
public String name() {
return mapName;
}
@Override
public int size() {
checkState(!destroyed, destroyedMessage);
// TODO: Maintain a separate counter for tracking live elements in map.
......