Madan Jampani
Committed by Gerrit Code Review

Dropping DatabaseManager and related code. Goodbye!

Change-Id: I5d90d62678402234462dad8be455903de481da21
Showing 34 changed files with 3 additions and 2787 deletions
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.concurrent.CompletableFuture;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* Extension of {@link DefaultAsyncConsistentMap} that provides a weaker read consistency
* guarantee in return for better read performance.
* <p>
* For read/write operations that are local to a node this map implementation provides
* guarantees similar to a ConsistentMap. However for read/write operations executed
* across multiple nodes this implementation only provides eventual consistency.
*
* @param <K> key type
* @param <V> value type
*/
public class AsyncCachingConsistentMap<K, V> extends DefaultAsyncConsistentMap<K, V> {
private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache =
CacheBuilder.newBuilder()
.maximumSize(10000) // TODO: make configurable
.build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() {
@Override
public CompletableFuture<Versioned<V>> load(K key)
throws Exception {
return AsyncCachingConsistentMap.super.get(key);
}
});
public AsyncCachingConsistentMap(String name,
ApplicationId applicationId,
Database database,
Serializer serializer,
boolean readOnly,
boolean purgeOnUninstall,
boolean meteringEnabled) {
super(name, applicationId, database, serializer, readOnly, purgeOnUninstall, meteringEnabled);
addListener(event -> cache.invalidate(event.key()));
}
@Override
public CompletableFuture<Versioned<V>> get(K key) {
CompletableFuture<Versioned<V>> cachedValue = cache.getIfPresent(key);
if (cachedValue != null) {
if (cachedValue.isCompletedExceptionally()) {
cache.invalidate(key);
} else {
return cachedValue;
}
}
return cache.getUnchecked(key);
}
@Override
protected void beforeUpdate(K key) {
super.beforeUpdate(key);
cache.invalidate(key);
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Collections;
import java.util.List;
import com.google.common.collect.ImmutableList;
/**
* Result of a Transaction commit operation.
*/
public final class CommitResponse {
private boolean success;
private List<UpdateResult<String, byte[]>> updates;
public static CommitResponse success(List<UpdateResult<String, byte[]>> updates) {
return new CommitResponse(true, updates);
}
public static CommitResponse failure() {
return new CommitResponse(false, Collections.emptyList());
}
private CommitResponse(boolean success, List<UpdateResult<String, byte[]>> updates) {
this.success = success;
this.updates = ImmutableList.copyOf(updates);
}
public boolean success() {
return success;
}
public List<UpdateResult<String, byte[]>> updates() {
return updates;
}
@Override
public String toString() {
return toStringHelper(this)
.add("success", success)
.add("udpates", updates)
.toString();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import net.kuujo.copycat.protocol.AbstractProtocol;
import net.kuujo.copycat.protocol.ProtocolClient;
import net.kuujo.copycat.protocol.ProtocolHandler;
import net.kuujo.copycat.protocol.ProtocolServer;
import net.kuujo.copycat.util.Configurable;
/**
* Protocol for Copycat communication that employs
* {@code ClusterCommunicationService}.
*/
public class CopycatCommunicationProtocol extends AbstractProtocol {
private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
new MessageSubject("onos-copycat-message");
protected ClusterService clusterService;
protected ClusterCommunicationService clusterCommunicator;
public CopycatCommunicationProtocol(ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
@Override
public Configurable copy() {
return this;
}
@Override
public ProtocolClient createClient(URI uri) {
NodeId nodeId = uriToNodeId(uri);
if (nodeId == null) {
throw new IllegalStateException("Unknown peer " + uri);
}
return new Client(nodeId);
}
@Override
public ProtocolServer createServer(URI uri) {
return new Server();
}
private class Server implements ProtocolServer {
@Override
public void handler(ProtocolHandler handler) {
if (handler == null) {
clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
} else {
clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
ByteBuffer::wrap,
handler,
Tools::byteBuffertoArray);
// FIXME: Tools::byteBuffertoArray involves a array copy.
}
}
@Override
public CompletableFuture<Void> listen() {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
return CompletableFuture.completedFuture(null);
}
}
private class Client implements ProtocolClient {
private final NodeId peer;
public Client(NodeId peer) {
this.peer = peer;
}
@Override
public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
return clusterCommunicator.sendAndReceive(request,
COPYCAT_MESSAGE_SUBJECT,
Tools::byteBuffertoArray,
ByteBuffer::wrap,
peer);
}
@Override
public CompletableFuture<Void> connect() {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
}
}
private NodeId uriToNodeId(URI uri) {
return clusterService.getNodes()
.stream()
.filter(node -> uri.getHost().equals(node.ip().toString()))
.map(ControllerNode::id)
.findAny()
.orElse(null);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.function.Consumer;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.resource.Resource;
/**
* Database.
*/
public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
/**
* Creates a new database with the default cluster configuration.<p>
*
* The database will be constructed with the default cluster configuration. The default cluster configuration
* searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
* options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
*
* Additionally, the database will be constructed with an database configuration that searches the classpath for
* three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
*
* @param name The database name.
* @return The database.
*/
static Database create(String name) {
return create(name, new ClusterConfig(), new DatabaseConfig());
}
/**
* Creates a new database.<p>
*
* The database will be constructed with an database configuration that searches the classpath for
* three configuration files - {@code name}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
*
* @param name The database name.
* @param cluster The cluster configuration.
* @return The database.
*/
static Database create(String name, ClusterConfig cluster) {
return create(name, cluster, new DatabaseConfig());
}
/**
* Creates a new database.
*
* @param name The database name.
* @param cluster The cluster configuration.
* @param config The database configuration.
* @return The database.
*/
static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
ClusterCoordinator coordinator =
new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
return coordinator.<Database>getResource(name, config.resolve(cluster))
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}
/**
* Tells whether the database supports change notifications.
* @return true if notifications are supported; false otherwise
*/
default boolean hasChangeNotificationSupport() {
return true;
}
/**
* Registers a new consumer of StateMachineUpdates.
* @param consumer consumer to register
*/
void registerConsumer(Consumer<StateMachineUpdate> consumer);
/**
* Unregisters a consumer of StateMachineUpdates.
* @param consumer consumer to unregister
*/
void unregisterConsumer(Consumer<StateMachineUpdate> consumer);
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import com.typesafe.config.ConfigValueFactory;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.resource.ResourceConfig;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.util.internal.Assert;
import java.util.Map;
/**
* Database configuration.
*
*/
public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
private static final String DATABASE_CONSISTENCY = "consistency";
private static final String DEFAULT_CONFIGURATION = "database-defaults";
private static final String CONFIGURATION = "database";
private String name;
public DatabaseConfig() {
super(CONFIGURATION, DEFAULT_CONFIGURATION);
}
public DatabaseConfig(Map<String, Object> config) {
super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
}
public DatabaseConfig(String resource) {
super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
}
protected DatabaseConfig(DatabaseConfig config) {
super(config);
}
@Override
public DatabaseConfig copy() {
return new DatabaseConfig(this);
}
/**
* Sets the database read consistency.
*
* @param consistency The database read consistency.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public void setConsistency(String consistency) {
this.config = config.withValue(DATABASE_CONSISTENCY,
ConfigValueFactory.fromAnyRef(
Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
}
/**
* Sets the database read consistency.
*
* @param consistency The database read consistency.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public void setConsistency(Consistency consistency) {
this.config = config.withValue(DATABASE_CONSISTENCY,
ConfigValueFactory.fromAnyRef(
Assert.isNotNull(consistency, "consistency").toString()));
}
/**
* Returns the database read consistency.
*
* @return The database read consistency.
*/
public Consistency getConsistency() {
return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
}
/**
* Sets the database read consistency, returning the configuration for method chaining.
*
* @param consistency The database read consistency.
* @return The database configuration.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public DatabaseConfig withConsistency(String consistency) {
setConsistency(consistency);
return this;
}
/**
* Sets the database read consistency, returning the configuration for method chaining.
*
* @param consistency The database read consistency.
* @return The database configuration.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public DatabaseConfig withConsistency(Consistency consistency) {
setConsistency(consistency);
return this;
}
/**
* Returns the database name.
*
* @return The database name
*/
public String getName() {
return name;
}
/**
* Sets the database name, returning the configuration for method chaining.
*
* @param name The database name
* @return The database configuration
* @throws java.lang.NullPointerException If the name is {@code null}
*/
public DatabaseConfig withName(String name) {
setName(Assert.isNotNull(name, "name"));
return this;
}
/**
* Sets the database name.
*
* @param name The database name
* @throws java.lang.NullPointerException If the name is {@code null}
*/
public void setName(String name) {
this.name = Assert.isNotNull(name, "name");
}
@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
return new StateLogConfig(toMap())
.resolve(cluster)
.withResourceType(DefaultDatabase.class);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkState;
import java.util.List;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
/**
* Partitioner for mapping map entries to individual database partitions.
* <p>
* By default a md5 hash of the hash key (key or map name) is used to pick a
* partition.
*/
public abstract class DatabasePartitioner implements Partitioner<String> {
// Database partitions sorted by their partition name.
protected final List<Database> partitions;
public DatabasePartitioner(List<Database> partitions) {
checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty");
this.partitions = ImmutableList.copyOf(partitions);
}
protected int hash(String key) {
return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt());
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* Database proxy.
*/
public interface DatabaseProxy<K, V> {
/**
* Returns a set of all map names.
*
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Set<String>> maps();
/**
* Returns a mapping from counter name to next value.
*
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Map<String, Long>> counters();
/**
* Returns the number of entries in map.
*
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Integer> mapSize(String mapName);
/**
* Checks whether the map is empty.
*
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapIsEmpty(String mapName);
/**
* Checks whether the map contains a key.
*
* @param mapName map name
* @param key key to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
/**
* Checks whether the map contains a value.
*
* @param mapName map name
* @param value The value to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
/**
* Gets a value from the map.
*
* @param mapName map name
* @param key The key to get.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
/**
* Updates the map.
*
* @param mapName map name
* @param key The key to set
* @param valueMatch match for checking existing value
* @param versionMatch match for checking existing version
* @param value new value
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value);
/**
* Clears the map.
*
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Result<Void>> mapClear(String mapName);
/**
* Gets a set of keys in the map.
*
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Set<K>> mapKeySet(String mapName);
/**
* Gets a collection of values in the map.
*
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Collection<Versioned<V>>> mapValues(String mapName);
/**
* Gets a set of entries in the map.
*
* @param mapName map name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
/**
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
* @param delta value to add
* @return updated value
*/
CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
/**
* Atomically add the given value to current value of the specified counter.
*
* @param counterName counter name
* @param delta value to add
* @return previous value
*/
CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
/**
* Atomically sets the given value to current value of the specified counter.
*
* @param counterName counter name
* @param value value to set
* @return void future
*/
CompletableFuture<Void> counterSet(String counterName, long value);
/**
* Atomically sets the given counter to the specified update value if and only if the current value is equal to the
* expected value.
* @param counterName counter name
* @param expectedValue value to use for equivalence check
* @param update value to set if expected value is current value
* @return true if an update occurred, false otherwise
*/
CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
/**
* Returns the current value of the specified atomic counter.
*
* @param counterName counter name
* @return current value
*/
CompletableFuture<Long> counterGet(String counterName);
/**
* Returns the size of queue.
*
* @param queueName queue name
* @return queue size
*/
CompletableFuture<Long> queueSize(String queueName);
/**
* Inserts an entry into the queue.
*
* @param queueName queue name
* @param entry queue entry
* @return void future
*/
CompletableFuture<Void> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
*
* @param queueName queue name
* @return entry future. Can be completed with null if queue is empty
*/
CompletableFuture<byte[]> queuePop(String queueName);
/**
* Returns but does not remove an entry from the queue.
*
* @param queueName queue name
* @return entry. Can be null if queue is empty
*/
CompletableFuture<byte[]> queuePeek(String queueName);
/**
* Prepare and commit the specified transaction.
*
* @param transaction transaction to commit (after preparation)
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction);
/**
* Prepare the specified transaction for commit. A successful prepare implies
* all the affected resources are locked thus ensuring no concurrent updates can interfere.
*
* @param transaction transaction to prepare (for commit)
* @return A completable future to be completed with the result once complete. The future is completed
* with true if the transaction is successfully prepared i.e. all pre-conditions are met and
* applicable resources locked.
*/
CompletableFuture<Boolean> prepare(Transaction transaction);
/**
* Commit the specified transaction. A successful commit implies
* all the updates are applied, are now durable and are now visible externally.
*
* @param transaction transaction to commit
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<CommitResponse> commit(Transaction transaction);
/**
* Rollback the specified transaction. A successful rollback implies
* all previously acquired locks for the affected resources are released.
*
* @param transaction transaction to rollback
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Boolean> rollback(Transaction transaction);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.cluster.internal.MemberInfo;
import net.kuujo.copycat.raft.protocol.AppendRequest;
import net.kuujo.copycat.raft.protocol.AppendResponse;
import net.kuujo.copycat.raft.protocol.CommitRequest;
import net.kuujo.copycat.raft.protocol.CommitResponse;
import net.kuujo.copycat.raft.protocol.PollRequest;
import net.kuujo.copycat.raft.protocol.PollResponse;
import net.kuujo.copycat.raft.protocol.QueryRequest;
import net.kuujo.copycat.raft.protocol.QueryResponse;
import net.kuujo.copycat.raft.protocol.ReplicaInfo;
import net.kuujo.copycat.raft.protocol.SyncRequest;
import net.kuujo.copycat.raft.protocol.SyncResponse;
import net.kuujo.copycat.raft.protocol.VoteRequest;
import net.kuujo.copycat.raft.protocol.VoteResponse;
import net.kuujo.copycat.util.serializer.SerializerConfig;
/**
* Serializer for DatabaseManager's interaction with Copycat.
*/
public class DatabaseSerializer extends SerializerConfig {
private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(AppendRequest.class)
.register(AppendResponse.class)
.register(SyncRequest.class)
.register(SyncResponse.class)
.register(VoteRequest.class)
.register(VoteResponse.class)
.register(PollRequest.class)
.register(PollResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommitRequest.class)
.register(CommitResponse.class)
.register(ReplicaInfo.class)
.register(MemberInfo.class)
.build();
private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
.register(MapUpdate.class)
.register(MapUpdate.Type.class)
.register(Result.class)
.register(UpdateResult.class)
.register(Result.Status.class)
.register(Transaction.class)
.register(Transaction.State.class)
.register(TransactionId.class)
.register(org.onosproject.store.primitives.impl.CommitResponse.class)
.register(Match.class)
.register(NodeId.class)
.build();
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(COPYCAT)
.register(ONOS_STORE)
.build();
}
};
@Override
public ByteBuffer writeObject(Object object) {
return ByteBuffer.wrap(SERIALIZER.encode(object));
}
@Override
public <T> T readObject(ByteBuffer buffer) {
return SERIALIZER.decode(buffer);
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Database state.
*
*/
public interface DatabaseState<K, V> {
/**
* Initializes the database state.
*
* @param context The map state context.
*/
@Initializer
void init(StateContext<DatabaseState<K, V>> context);
@Query
Set<String> maps();
@Query
Map<String, Long> counters();
@Query
int mapSize(String mapName);
@Query
boolean mapIsEmpty(String mapName);
@Query
boolean mapContainsKey(String mapName, K key);
@Query
boolean mapContainsValue(String mapName, V value);
@Query
Versioned<V> mapGet(String mapName, K key);
@Command
Result<UpdateResult<K, V>> mapUpdate(String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value);
@Command
Result<Void> mapClear(String mapName);
@Query
Set<K> mapKeySet(String mapName);
@Query
Collection<Versioned<V>> mapValues(String mapName);
@Query
Set<Entry<K, Versioned<V>>> mapEntrySet(String mapName);
@Command
Long counterAddAndGet(String counterName, long delta);
@Command
Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
@Command
Long counterGetAndAdd(String counterName, long delta);
@Query
Long queueSize(String queueName);
@Query
byte[] queuePeek(String queueName);
@Command
byte[] queuePop(String queueName);
@Command
void queuePush(String queueName, byte[] entry);
@Query
Long counterGet(String counterName);
@Command
void counterSet(String counterName, long value);
@Command
CommitResponse prepareAndCommit(Transaction transaction);
@Command
boolean prepare(Transaction transaction);
@Command
CommitResponse commit(Transaction transaction);
@Command
boolean rollback(Transaction transaction);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.utils.MeteringAgent;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Default implementation for a distributed AsyncAtomicCounter backed by
* partitioned Raft DB.
* <p>
* The initial value will be zero.
*/
public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
private final String name;
private final Database database;
private final MeteringAgent monitor;
private static final String PRIMITIVE_NAME = "atomicCounter";
private static final String INCREMENT_AND_GET = "incrementAndGet";
private static final String GET_AND_INCREMENT = "getAndIncrement";
private static final String GET_AND_ADD = "getAndAdd";
private static final String ADD_AND_GET = "addAndGet";
private static final String GET = "get";
private static final String SET = "set";
private static final String COMPARE_AND_SET = "compareAndSet";
public DefaultAsyncAtomicCounter(String name,
Database database,
boolean meteringEnabled) {
this.name = checkNotNull(name);
this.database = checkNotNull(database);
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Long> incrementAndGet() {
final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
return addAndGet(1L)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> get() {
final MeteringAgent.Context timer = monitor.startTimer(GET);
return database.counterGet(name)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> getAndIncrement() {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT);
return getAndAdd(1L)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> getAndAdd(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
return database.counterGetAndAdd(name, delta)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
return database.counterAddAndGet(name, delta)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Void> set(long value) {
final MeteringAgent.Context timer = monitor.startTimer(SET);
return database.counterSet(name, value)
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
return database.counterCompareAndSet(name, expectedValue, updateValue)
.whenComplete((r, e) -> timer.stop(e));
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounterBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Default implementation of AtomicCounterBuilder.
*/
public class DefaultAtomicCounterBuilder extends AtomicCounterBuilder {
private final Database partitionedDatabase;
private final Database inMemoryDatabase;
public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
this.inMemoryDatabase = inMemoryDatabase;
this.partitionedDatabase = partitionedDatabase;
}
@Override
public AsyncAtomicCounter build() {
Database database = partitionsDisabled() ? inMemoryDatabase : partitionedDatabase;
return new DefaultAsyncAtomicCounter(checkNotNull(name()), database, meteringEnabled());
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import static com.google.common.base.Preconditions.checkState;
/**
* Default Consistent Map builder.
*
* @param <K> type for map key
* @param <V> type for map value
*/
public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
private final DatabaseManager manager;
public DefaultConsistentMapBuilder(DatabaseManager manager) {
this.manager = manager;
}
private void validateInputs() {
checkState(name() != null, "name must be specified");
checkState(serializer() != null, "serializer must be specified");
if (purgeOnUninstall()) {
checkState(applicationId() != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
}
}
@Override
public ConsistentMap<K, V> build() {
return buildAndRegisterMap().asConsistentMap();
}
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
return buildAndRegisterMap();
}
private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
validateInputs();
Database database = partitionsDisabled() ? manager.inMemoryDatabase : manager.partitionedDatabase;
if (relaxedReadConsistency()) {
return manager.registerMap(
new AsyncCachingConsistentMap<>(name(),
applicationId(),
database,
serializer(),
readOnly(),
purgeOnUninstall(),
meteringEnabled()));
} else {
return manager.registerMap(
new DefaultAsyncConsistentMap<>(name(),
applicationId(),
database,
serializer(),
readOnly(),
purgeOnUninstall(),
meteringEnabled()));
}
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import com.google.common.collect.Sets;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.resource.internal.ResourceManager;
import net.kuujo.copycat.state.StateMachine;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.function.TriConsumer;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* Default database.
*/
public class DefaultDatabase extends AbstractResource<Database> implements Database {
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
@SuppressWarnings({"unchecked", "rawtypes"})
public DefaultDatabase(ResourceManager context) {
super(context);
this.stateMachine = new DefaultStateMachine(context,
DatabaseState.class,
DefaultDatabaseState.class,
DefaultDatabase.class.getClassLoader());
this.stateMachine.addStartupTask(() -> {
stateMachine.registerWatcher(watcher);
return CompletableFuture.completedFuture(null);
});
this.stateMachine.addShutdownTask(() -> {
stateMachine.unregisterWatcher(watcher);
return CompletableFuture.completedFuture(null);
});
}
/**
* If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
* return the completed future result.
*
* @param supplier The supplier to call if the database is open.
* @param <T> The future result type.
* @return A completable future that if this database is closed is immediately failed.
*/
protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
if (proxy == null) {
return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
}
return supplier.get();
}
@Override
public CompletableFuture<Set<String>> maps() {
return checkOpen(() -> proxy.maps());
}
@Override
public CompletableFuture<Map<String, Long>> counters() {
return checkOpen(() -> proxy.counters());
}
@Override
public CompletableFuture<Integer> mapSize(String mapName) {
return checkOpen(() -> proxy.mapSize(mapName));
}
@Override
public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
return checkOpen(() -> proxy.mapIsEmpty(mapName));
}
@Override
public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
return checkOpen(() -> proxy.mapContainsKey(mapName, key));
}
@Override
public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
return checkOpen(() -> proxy.mapContainsValue(mapName, value));
}
@Override
public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
return checkOpen(() -> proxy.mapGet(mapName, key));
}
@Override
public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
}
@Override
public CompletableFuture<Result<Void>> mapClear(String mapName) {
return checkOpen(() -> proxy.mapClear(mapName));
}
@Override
public CompletableFuture<Set<String>> mapKeySet(String mapName) {
return checkOpen(() -> proxy.mapKeySet(mapName));
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
return checkOpen(() -> proxy.mapValues(mapName));
}
@Override
public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
return checkOpen(() -> proxy.mapEntrySet(mapName));
}
@Override
public CompletableFuture<Long> counterGet(String counterName) {
return checkOpen(() -> proxy.counterGet(counterName));
}
@Override
public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
}
@Override
public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
}
@Override
public CompletableFuture<Void> counterSet(String counterName, long value) {
return checkOpen(() -> proxy.counterSet(counterName, value));
}
@Override
public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
}
@Override
public CompletableFuture<Long> queueSize(String queueName) {
return checkOpen(() -> proxy.queueSize(queueName));
}
@Override
public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
return checkOpen(() -> proxy.queuePush(queueName, entry));
}
@Override
public CompletableFuture<byte[]> queuePop(String queueName) {
return checkOpen(() -> proxy.queuePop(queueName));
}
@Override
public CompletableFuture<byte[]> queuePeek(String queueName) {
return checkOpen(() -> proxy.queuePeek(queueName));
}
@Override
public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
return checkOpen(() -> proxy.prepareAndCommit(transaction));
}
@Override
public CompletableFuture<Boolean> prepare(Transaction transaction) {
return checkOpen(() -> proxy.prepare(transaction));
}
@Override
public CompletableFuture<CommitResponse> commit(Transaction transaction) {
return checkOpen(() -> proxy.commit(transaction));
}
@Override
public CompletableFuture<Boolean> rollback(Transaction transaction) {
return checkOpen(() -> proxy.rollback(transaction));
}
@Override
@SuppressWarnings("unchecked")
public synchronized CompletableFuture<Database> open() {
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
})
.thenApply(v -> null);
}
@Override
public synchronized CompletableFuture<Void> close() {
proxy = null;
return stateMachine.close()
.thenCompose(v -> runShutdownTasks());
}
@Override
public int hashCode() {
return name().hashCode();
}
@Override
public boolean equals(Object other) {
if (other instanceof Database) {
return name().equals(((Database) other).name());
}
return false;
}
@Override
public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
consumers.add(consumer);
}
@Override
public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
consumers.remove(consumer);
}
private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
@Override
public void accept(String name, Object input, Object output) {
StateMachineUpdate update = new StateMachineUpdate(name, input, output);
consumers.forEach(consumer -> consumer.accept(update));
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.onlab.util.SharedExecutors;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.utils.MeteringAgent;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.QUEUE_PUSH;
/**
* DistributedQueue implementation that provides FIFO ordering semantics.
*
* @param <E> queue entry type
*/
public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
private final String name;
private final Database database;
private final Serializer serializer;
private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
private static final String PRIMITIVE_NAME = "distributedQueue";
private static final String SIZE = "size";
private static final String PUSH = "push";
private static final String POP = "pop";
private static final String PEEK = "peek";
private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
private final MeteringAgent monitor;
public DefaultDistributedQueue(String name,
Database database,
Serializer serializer,
boolean meteringEnabled) {
this.name = checkNotNull(name, "queue name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
this.database.registerConsumer(update -> {
SharedExecutors.getSingleThreadExecutor().execute(() -> {
if (update.target() == QUEUE_PUSH) {
List<Object> input = update.input();
String queueName = (String) input.get(0);
if (queueName.equals(name)) {
tryPoll();
}
}
});
});
}
@Override
public long size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e)));
}
@Override
public void push(E entry) {
checkNotNull(entry, ERROR_NULL_ENTRY);
final MeteringAgent.Context timer = monitor.startTimer(PUSH);
Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
.whenComplete((r, e) -> timer.stop(e)));
}
@Override
public CompletableFuture<E> pop() {
final MeteringAgent.Context timer = monitor.startTimer(POP);
return database.queuePop(name)
.whenComplete((r, e) -> timer.stop(e))
.thenCompose(v -> {
if (v != null) {
return CompletableFuture.<E>completedFuture(serializer.decode(v));
}
CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
pendingFutures.add(newPendingFuture);
return newPendingFuture;
});
}
@Override
public E peek() {
final MeteringAgent.Context timer = monitor.startTimer(PEEK);
return Futures.getUnchecked(database.queuePeek(name)
.thenApply(v -> v != null ? serializer.<E>decode(v) : null)
.whenComplete((r, e) -> timer.stop(e)));
}
@Override
public String name() {
return name;
}
@Override
public DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.QUEUE;
}
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
E entry = Futures.getUnchecked(database.queuePop(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
if (entry != null) {
future.complete(entry);
completedFutures.add(future);
} else {
break;
}
}
pendingFutures.removeAll(completedFutures);
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
* Default implementation of a {@code DistributedQueueBuilder}.
*
* @param <E> queue entry type
*/
public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
private Serializer serializer;
private String name;
private boolean persistenceEnabled = true;
private final DatabaseManager databaseManager;
private boolean metering = true;
public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) {
this.databaseManager = databaseManager;
}
@Override
public DistributedQueueBuilder<E> withName(String name) {
checkArgument(name != null && !name.isEmpty());
this.name = name;
return this;
}
@Override
public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
return this;
}
@Override
public DistributedQueueBuilder<E> withMeteringDisabled() {
metering = false;
return this;
}
@Override
public DistributedQueueBuilder<E> withPersistenceDisabled() {
persistenceEnabled = false;
return this;
}
private boolean validInputs() {
return name != null && serializer != null;
}
@Override
public DistributedQueue<E> build() {
checkState(validInputs());
return new DefaultDistributedQueue<>(
name,
persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
serializer,
metering);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
/**
* Default TransactionContext implementation.
*/
public class DefaultTransactionContext implements TransactionContext {
private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open";
@SuppressWarnings("rawtypes")
private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
private boolean isOpen = false;
private final Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter;
private final TransactionId transactionId;
private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
public DefaultTransactionContext(TransactionId transactionId,
Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter,
Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
this.transactionId = transactionId;
this.transactionCommitter = checkNotNull(transactionCommitter);
this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
}
@Override
public TransactionId transactionId() {
return transactionId;
}
@Override
public void begin() {
checkState(!isOpen, "Transaction Context is already open");
isOpen = true;
}
@Override
public boolean isOpen() {
return isOpen;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
Serializer serializer) {
checkState(isOpen, TX_NOT_OPEN_ERROR);
checkNotNull(mapName);
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> {
ConsistentMapBuilder mapBuilder = (ConsistentMapBuilder) mapBuilderSupplier.get()
.withName(name)
.withSerializer(serializer);
return new DefaultTransactionalMap<>(
name,
mapBuilder.buildAsyncMap(),
this,
serializer);
});
}
@SuppressWarnings("unchecked")
@Override
public CompletableFuture<CommitStatus> commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
CommitStatus status;
try {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
Transaction transaction = new Transaction(transactionId, updates);
status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK
? CommitStatus.SUCCESS : CommitStatus.FAILURE;
} catch (Exception e) {
abort();
status = CommitStatus.FAILURE;
} finally {
isOpen = false;
}
return CompletableFuture.completedFuture(status);
}
@Override
public void abort() {
if (isOpen) {
try {
txMaps.values().forEach(m -> m.abort());
} finally {
isOpen = false;
}
}
}
@Override
public String toString() {
ToStringHelper s = MoreObjects.toStringHelper(this)
.add("transactionId", transactionId)
.add("isOpen", isOpen);
txMaps.entrySet().forEach(e -> {
s.add(e.getKey(), e.getValue());
});
return s.toString();
}
@Override
public String name() {
return transactionId.toString();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionContextBuilder;
/**
* The default implementation of a transaction context builder. This builder
* generates a {@link DefaultTransactionContext}.
*/
public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
private final Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter;
private final TransactionId transactionId;
public DefaultTransactionContextBuilder(Supplier<ConsistentMapBuilder> mapBuilderSupplier,
Function<Transaction, CompletableFuture<CommitResult>> transactionCommiter,
TransactionId transactionId) {
this.mapBuilderSupplier = mapBuilderSupplier;
this.transactionCommitter = transactionCommiter;
this.transactionId = transactionId;
}
@Override
public TransactionContext build() {
return new DefaultTransactionContext(transactionId, transactionCommitter, () -> {
ConsistentMapBuilder mapBuilder = mapBuilderSupplier.get();
if (partitionsDisabled()) {
mapBuilder = (ConsistentMapBuilder) mapBuilder.withPartitionsDisabled();
}
return mapBuilder;
});
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
/**
* Set view backed by Set with element type {@code <BACK>} but returns
* element as {@code <OUT>} for convenience.
*
* @param <BACK> Backing {@link Set} element type.
* MappingSet will follow this type's equality behavior.
* @param <OUT> external facing element type.
* MappingSet will ignores equality defined by this type.
*/
class MappingSet<BACK, OUT> implements Set<OUT> {
private final Set<BACK> backedSet;
private final Function<OUT, BACK> toBack;
private final Function<BACK, OUT> toOut;
public MappingSet(Set<BACK> backedSet,
Function<Set<BACK>, Set<BACK>> supplier,
Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) {
this.backedSet = supplier.apply(backedSet);
this.toBack = toBack;
this.toOut = toOut;
}
@Override
public int size() {
return backedSet.size();
}
@Override
public boolean isEmpty() {
return backedSet.isEmpty();
}
@Override
public boolean contains(Object o) {
return backedSet.contains(toBack.apply((OUT) o));
}
@Override
public Iterator<OUT> iterator() {
return Iterators.transform(backedSet.iterator(), toOut::apply);
}
@Override
public Object[] toArray() {
return backedSet.stream()
.map(toOut)
.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return backedSet.stream()
.map(toOut)
.toArray(size -> {
if (size < a.length) {
return (T[]) new Object[size];
} else {
Arrays.fill(a, null);
return a;
}
});
}
@Override
public boolean add(OUT e) {
return backedSet.add(toBack.apply(e));
}
@Override
public boolean remove(Object o) {
return backedSet.remove(toBack.apply((OUT) o));
}
@Override
public boolean containsAll(Collection<?> c) {
return c.stream()
.map(e -> toBack.apply((OUT) e))
.allMatch(backedSet::contains);
}
@Override
public boolean addAll(Collection<? extends OUT> c) {
return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList()));
}
@Override
public boolean retainAll(Collection<?> c) {
return backedSet.retainAll(c.stream()
.map(x -> toBack.apply((OUT) x))
.collect(Collectors.toList()));
}
@Override
public boolean removeAll(Collection<?> c) {
return backedSet.removeAll(c.stream()
.map(x -> toBack.apply((OUT) x))
.collect(Collectors.toList()));
}
@Override
public void clear() {
backedSet.clear();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
/**
* Partitioner is responsible for mapping keys to individual database partitions.
*
* @param <K> key type.
*/
public interface Partitioner<K> {
/**
* Returns the database partition.
* @param mapName map name
* @param key key
* @return Database partition
*/
Database getPartition(String mapName, K key);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
/**
* Result of a database update operation.
*
* @param <V> return value type
*/
public final class Result<V> {
public enum Status {
/**
* Indicates a successful update.
*/
OK,
/**
* Indicates a failure due to underlying state being locked by another transaction.
*/
LOCKED
}
private final Status status;
private final V value;
/**
* Creates a new Result instance with the specified value with status set to Status.OK.
*
* @param <V> result value type
* @param value result value
* @return Result instance
*/
public static <V> Result<V> ok(V value) {
return new Result<>(value, Status.OK);
}
/**
* Creates a new Result instance with status set to Status.LOCKED.
*
* @param <V> result value type
* @return Result instance
*/
public static <V> Result<V> locked() {
return new Result<>(null, Status.LOCKED);
}
private Result(V value, Status status) {
this.value = value;
this.status = status;
}
/**
* Returns true if this result indicates a successful execution i.e status is Status.OK.
*
* @return true if successful, false otherwise
*/
public boolean success() {
return status == Status.OK;
}
/**
* Returns the status of database update operation.
*
* @return database update status
*/
public Status status() {
return status;
}
/**
* Returns the return value for the update.
*
* @return value returned by database update. If the status is another
* other than Status.OK, this returns a null
*/
public V value() {
return value;
}
@Override
public int hashCode() {
return Objects.hash(value, status);
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object other) {
if (!(other instanceof Result)) {
return false;
}
Result<V> that = (Result<V>) other;
return Objects.equals(this.value, that.value) &&
Objects.equals(this.status, that.status);
}
@Override
public String toString() {
return toStringHelper(this)
.add("status", status)
.add("value", value)
.toString();
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.List;
/**
* A simple Partitioner for mapping keys to database partitions.
* <p>
* This class uses a md5 hash based hashing scheme for hashing the key to
* a partition.
*
*/
public class SimpleKeyHashPartitioner extends DatabasePartitioner {
public SimpleKeyHashPartitioner(List<Database> partitions) {
super(partitions);
}
@Override
public Database getPartition(String mapName, String key) {
return partitions.get(hash(key) % partitions.size());
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.List;
/**
* A simple Partitioner that uses the map name hash to
* pick a partition.
* <p>
* This class uses a md5 hash based hashing scheme for hashing the map name to
* a partition. This partitioner maps all keys for a map to the same database
* partition.
*/
public class SimpleTableHashPartitioner extends DatabasePartitioner {
public SimpleTableHashPartitioner(List<Database> partitions) {
super(partitions);
}
@Override
public Database getPartition(String mapName, String key) {
return partitions.get(hash(mapName) % partitions.size());
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Representation of a state machine update.
*/
public class StateMachineUpdate {
/**
* Target data structure type this update is for.
*/
enum Target {
/**
* Update is for a map.
*/
MAP_UPDATE,
/**
* Update is a transaction commit.
*/
TX_COMMIT,
/**
* Update is a queue push.
*/
QUEUE_PUSH,
/**
* Update is for some other operation.
*/
OTHER
}
private final String operationName;
private final Object input;
private final Object output;
public StateMachineUpdate(String operationName, Object input, Object output) {
this.operationName = operationName;
this.input = input;
this.output = output;
}
public Target target() {
// FIXME: This check is brittle
if (operationName.contains("mapUpdate")) {
return Target.MAP_UPDATE;
} else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
return Target.TX_COMMIT;
} else if (operationName.contains("queuePush")) {
return Target.QUEUE_PUSH;
} else {
return Target.OTHER;
}
}
@SuppressWarnings("unchecked")
public <T> T input() {
return (T) input;
}
@SuppressWarnings("unchecked")
public <T> T output() {
return (T) output;
}
@Override
public String toString() {
return toStringHelper(this)
.add("name", operationName)
.add("input", input)
.add("output", output)
.toString();
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.AsyncConsistentMap;
import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTED;
import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTING;
import static org.onosproject.store.primitives.impl.Transaction.State.ROLLEDBACK;
import static org.onosproject.store.primitives.impl.Transaction.State.ROLLINGBACK;
/**
* Agent that runs the two phase commit protocol.
*/
public class TransactionManager {
private final Database database;
private final AsyncConsistentMap<TransactionId, Transaction> transactions;
public TransactionManager(Database database, AsyncConsistentMap<TransactionId, Transaction> transactions) {
this.database = checkNotNull(database, "database cannot be null");
this.transactions = transactions;
}
/**
* Executes the specified transaction by employing a two phase commit protocol.
*
* @param transaction transaction to commit
* @return transaction commit result
*/
public CompletableFuture<CommitResult> execute(Transaction transaction) {
// short-circuit if there is only a single update
if (transaction.updates().size() <= 1) {
return database.prepareAndCommit(transaction)
.thenApply(response -> response.success()
? CommitResult.OK : CommitResult.FAILURE_DURING_COMMIT);
}
// clean up if this transaction in already in a terminal state.
if (transaction.state() == COMMITTED || transaction.state() == ROLLEDBACK) {
return transactions.remove(transaction.id()).thenApply(v -> CommitResult.OK);
} else if (transaction.state() == COMMITTING) {
return commit(transaction);
} else if (transaction.state() == ROLLINGBACK) {
return rollback(transaction).thenApply(v -> CommitResult.FAILURE_TO_PREPARE);
} else {
return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
}
}
/**
* Returns all pending transaction identifiers.
*
* @return future for a collection of transaction identifiers.
*/
public CompletableFuture<Collection<TransactionId>> getPendingTransactionIds() {
return transactions.values().thenApply(c -> c.stream()
.map(v -> v.value())
.filter(v -> v.state() != COMMITTED && v.state() != ROLLEDBACK)
.map(Transaction::id)
.collect(Collectors.toList()));
}
private CompletableFuture<Boolean> prepare(Transaction transaction) {
return transactions.put(transaction.id(), transaction)
.thenCompose(v -> database.prepare(transaction))
.thenCompose(status -> transactions.put(
transaction.id(),
transaction.transition(status ? COMMITTING : ROLLINGBACK))
.thenApply(v -> status));
}
private CompletableFuture<CommitResult> commit(Transaction transaction) {
return database.commit(transaction)
.thenCompose(r -> {
if (r.success()) {
return transactions.put(transaction.id(), transaction.transition(COMMITTED))
.thenApply(v -> CommitResult.OK);
} else {
return CompletableFuture.completedFuture(CommitResult.FAILURE_DURING_COMMIT);
}
});
}
private CompletableFuture<CommitResult> rollback(Transaction transaction) {
return database.rollback(transaction)
.thenCompose(v -> transactions.put(transaction.id(), transaction.transition(ROLLEDBACK)))
.thenApply(v -> CommitResult.FAILURE_TO_PREPARE);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.function.Function;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
/**
* Result of a update operation.
* <p>
* Both old and new values are accessible along with a flag that indicates if the
* the value was updated. If flag is false, oldValue and newValue both
* point to the same unmodified value.
* @param <V> result type
*/
public class UpdateResult<K, V> {
private final boolean updated;
private final String mapName;
private final K key;
private final Versioned<V> oldValue;
private final Versioned<V> newValue;
public UpdateResult(boolean updated, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
this.updated = updated;
this.mapName = mapName;
this.key = key;
this.oldValue = oldValue;
this.newValue = newValue;
}
public boolean updated() {
return updated;
}
public String mapName() {
return mapName;
}
public K key() {
return key;
}
public Versioned<V> oldValue() {
return oldValue;
}
public Versioned<V> newValue() {
return newValue;
}
public <K1, V1> UpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
return new UpdateResult<>(updated,
mapName,
keyTransform.apply(key),
oldValue == null ? null : oldValue.map(valueMapper),
newValue == null ? null : newValue.map(valueMapper));
}
public MapEvent<K, V> toMapEvent() {
if (!updated) {
return null;
} else {
return new MapEvent<>(mapName(), key(), newValue, oldValue);
}
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.assertTrue;
import org.junit.Test;
/**
* Unit tests for Result.
*/
public class ResultTest {
@Test
public void testLocked() {
Result<String> r = Result.locked();
assertFalse(r.success());
assertNull(r.value());
assertEquals(Result.Status.LOCKED, r.status());
}
@Test
public void testOk() {
Result<String> r = Result.ok("foo");
assertTrue(r.success());
assertEquals("foo", r.value());
assertEquals(Result.Status.OK, r.status());
}
@Test
public void testEquality() {
Result<String> r1 = Result.ok("foo");
Result<String> r2 = Result.locked();
Result<String> r3 = Result.ok("bar");
Result<String> r4 = Result.ok("foo");
assertTrue(r1.equals(r4));
assertFalse(r1.equals(r2));
assertFalse(r1.equals(r3));
assertFalse(r2.equals(r3));
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.assertTrue;
import org.junit.Test;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
/**
* Unit tests for UpdateResult.
*/
public class UpdateResultTest {
@Test
public void testGetters() {
Versioned<String> oldValue = new Versioned<>("a", 1);
Versioned<String> newValue = new Versioned<>("b", 2);
UpdateResult<String, String> ur =
new UpdateResult<>(true, "foo", "k", oldValue, newValue);
assertTrue(ur.updated());
assertEquals("foo", ur.mapName());
assertEquals("k", ur.key());
assertEquals(oldValue, ur.oldValue());
assertEquals(newValue, ur.newValue());
}
@Test
public void testToMapEvent() {
Versioned<String> oldValue = new Versioned<>("a", 1);
Versioned<String> newValue = new Versioned<>("b", 2);
UpdateResult<String, String> ur1 =
new UpdateResult<>(true, "foo", "k", oldValue, newValue);
MapEvent<String, String> event1 = ur1.toMapEvent();
assertEquals(MapEvent.Type.UPDATE, event1.type());
assertEquals("k", event1.key());
assertEquals(newValue, event1.value());
UpdateResult<String, String> ur2 =
new UpdateResult<>(true, "foo", "k", null, newValue);
MapEvent<String, String> event2 = ur2.toMapEvent();
assertEquals(MapEvent.Type.INSERT, event2.type());
assertEquals("k", event2.key());
assertEquals(newValue, event2.value());
UpdateResult<String, String> ur3 =
new UpdateResult<>(true, "foo", "k", oldValue, null);
MapEvent<String, String> event3 = ur3.toMapEvent();
assertEquals(MapEvent.Type.REMOVE, event3.type());
assertEquals("k", event3.key());
assertEquals(oldValue, event3.value());
UpdateResult<String, String> ur4 =
new UpdateResult<>(false, "foo", "k", oldValue, oldValue);
assertNull(ur4.toMapEvent());
}
@Test
public void testMap() {
Versioned<String> oldValue = new Versioned<>("a", 1);
Versioned<String> newValue = new Versioned<>("b", 2);
UpdateResult<String, String> ur1 =
new UpdateResult<>(true, "foo", "k", oldValue, newValue);
UpdateResult<Integer, Integer> ur2 = ur1.map(s -> s.length(), s -> s.length());
assertEquals(ur2.updated(), ur1.updated());
assertEquals(ur1.mapName(), ur2.mapName());
assertEquals(new Integer(1), ur2.key());
assertEquals(oldValue.map(s -> s.length()), ur2.oldValue());
assertEquals(newValue.map(s -> s.length()), ur2.newValue());
UpdateResult<String, String> ur3 =
new UpdateResult<>(true, "foo", "k", null, newValue);
UpdateResult<Integer, Integer> ur4 = ur3.map(s -> s.length(), s -> s.length());
assertEquals(ur3.updated(), ur4.updated());
assertEquals(ur3.mapName(), ur4.mapName());
assertEquals(new Integer(1), ur4.key());
assertNull(ur4.oldValue());
assertEquals(newValue.map(s -> s.length()), ur4.newValue());
}
}
......@@ -80,8 +80,7 @@
<!-- TODO: replace with final release version when it is out -->
<catalyst.version>1.0.6</catalyst.version>
<atomix.version>1.0.0-rc3</atomix.version>
<atomix.copycat.version>1.0.0-rc6</atomix.copycat.version>
<copycat.version>0.5.1.onos</copycat.version>
<copycat.version>1.0.0-rc6</copycat.version>
<openflowj.version>0.9.3.onos-SNAPSHOT</openflowj.version>
<onos-maven-plugin.version>1.9</onos-maven-plugin.version>
<osgi.version>4.3.1</osgi.version>
......
......@@ -53,28 +53,15 @@
<dependency>
<groupId>io.atomix.copycat</groupId>
<artifactId>copycat-client</artifactId>
<version>${atomix.copycat.version}</version>
<version>${copycat.version}</version>
</dependency>
<dependency>
<groupId>io.atomix.copycat</groupId>
<artifactId>copycat-server</artifactId>
<version>${atomix.copycat.version}</version>
</dependency>
<dependency>
<!-- FIXME once fixes get merged to upstream -->
<groupId>org.onosproject</groupId>
<artifactId>copycat-api</artifactId>
<version>${copycat.version}</version>
</dependency>
<dependency>
<!-- FIXME once fixes get merged to upstream -->
<groupId>org.onosproject</groupId>
<artifactId>copycat-core</artifactId>
<version>${copycat.version}</version>
</dependency>
</dependencies>
<build>
......@@ -99,16 +86,6 @@
<filters>
<filter>
<artifact>org.onosproject:copycat*</artifact>
<includes>
<include>**</include>
</includes>
<excludes>
<exclude>net/kuujo/copycat/**</exclude>
</excludes>
</filter>
<filter>
<artifact>io.atomix:atomix-all</artifact>
<includes>
<include>**</include>
......@@ -132,7 +109,7 @@
<configuration>
<instructions>
<Export-Package>
net.kuujo.copycat.*;io.atomix.*
io.atomix.*
</Export-Package>
<Import-Package>
!sun.nio.ch,!sun.misc,*
......