Madan Jampani
Committed by Gerrit Code Review

Supporting Atomix classes for DocumentTree distributed primitive

Change-Id: I754222337401f90f976d4152b6abbdf2e1a4df8e
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.NotThreadSafe;
/**
* A hierarchical <a href="https://en.wikipedia.org/wiki/Document_Object_Model">document tree</a> data structure.
*
* @param <V> document tree value type
*/
@NotThreadSafe
public interface AsyncDocumentTree<V> extends DistributedPrimitive {
/**
* Returns the {@link DocumentPath path} to root of the tree.
*
* @return path to root of the tree
*/
DocumentPath root();
/**
* Returns the child values for this node.
*
* @param path path to the node
* @return future for mapping from a child name to its value
* @throws NoSuchDocumentPathException if the path does not point to a valid node
*/
CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path);
/**
* Returns a value (and version) of the tree node at specified path.
*
* @param path path to node
* @return future for node value or {@code null} if path does not point to a valid node
*/
CompletableFuture<Versioned<V>> get(DocumentPath path);
/**
* Creates or updates a document tree node.
*
* @param path path for the node to create or update
* @param value the non-null value to be associated with the key
* @return future for the previous mapping or {@code null} if there was no previous mapping. Future will
* be completed with a NoSuchDocumentPathException if the parent node (for the node to create/update) does not exist
*/
CompletableFuture<Versioned<V>> set(DocumentPath path, V value);
/**
* Creates a document tree node if one does not exist already.
*
* @param path path for the node to create
* @param value the non-null value to be associated with the key
* @return future that is completed with {@code true} if the mapping could be added
* successfully; {@code false} otherwise. Future will be completed with a
* IllegalDocumentModificationException if the parent node (for the node to create) does not exist
*/
CompletableFuture<Boolean> create(DocumentPath path, V value);
/**
* Conditionally updates a tree node if the current version matches a specified version.
*
* @param path path for the node to create
* @param newValue the non-null value to be associated with the key
* @param version current version of the value for update to occur
* @return future that is completed with {@code true} if the update was made and the tree was
* modified, {@code false} otherwise.
*/
CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version);
/**
* Conditionally updates a tree node if the current value matches a specified value.
*
* @param path path for the node to create
* @param newValue the non-null value to be associated with the key
* @param currentValue current value for update to occur
* @return future that is completed with {@code true} if the update was made and the tree was
* modified, {@code false} otherwise.
*/
CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue);
/**
* Removes the node with the specified path.
*
* @param path path for the node to remove
* @return future for the previous value. Future will be completed with a
* IllegalDocumentModificationException if the node to be removed is either the root
* node or has one or more children. Future will be completed with a
* NoSuchDocumentPathException if the node to be removed does not exist
*/
CompletableFuture<Versioned<V>> removeNode(DocumentPath path);
/**
* Registers a listener to be notified when a subtree rooted at the specified path
* is modified.
*
* @param path path to root of subtree to monitor for updates
* @param listener listener to be notified
*/
CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener);
/**
* Unregisters a previously added listener.
*
* @param listener listener to unregister
*/
CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener);
/**
* Registers a listener to be notified when the tree is modified.
*
* @param listener listener to be notified
*/
default CompletableFuture<Void> addListener(DocumentTreeListener<V> listener) {
return addListener(root(), listener);
}
}
......@@ -72,6 +72,11 @@ public interface DistributedPrimitive {
WORK_QUEUE,
/**
* Document tree.
*/
DOCUMENT_TREE,
/**
* Distributed topic.
*/
TOPIC,
......
......@@ -52,6 +52,14 @@ public class DocumentTreeEvent<V> {
private final Optional<Versioned<V>> newValue;
private final Optional<Versioned<V>> oldValue;
@SuppressWarnings("unused")
private DocumentTreeEvent() {
this.path = null;
this.type = null;
this.newValue = null;
this.oldValue = null;
}
/**
* Constructs a new {@code DocumentTreeEvent}.
*
......
......@@ -15,12 +15,13 @@
*/
package org.onosproject.store.primitives.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.variables.internal.LongCommands;
import java.util.Arrays;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
......@@ -34,22 +35,28 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapC
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeFactory;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
import java.util.Arrays;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
* Serializer utility for Atomix Catalyst.
......@@ -69,6 +76,11 @@ public final class CatalystSerializers {
Transaction.State.class,
PrepareResult.class,
CommitResult.class,
DocumentPath.class,
DocumentTreeUpdateResult.class,
DocumentTreeUpdateResult.Status.class,
DocumentTreeEvent.class,
DocumentTreeEvent.Type.class,
RollbackResult.class));
// ONOS classes
serializer.register(Change.class, factory);
......@@ -90,6 +102,11 @@ public final class CatalystSerializers {
serializer.register(MapEvent.class, factory);
serializer.register(Task.class, factory);
serializer.register(WorkQueueStats.class, factory);
serializer.register(DocumentPath.class, factory);
serializer.register(DocumentTreeUpdateResult.class, factory);
serializer.register(DocumentTreeUpdateResult.Status.class, factory);
serializer.register(DocumentTreeEvent.class, factory);
serializer.register(DocumentTreeEvent.Type.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.register(ImmutableList.of().getClass(), factory);
......@@ -97,6 +114,7 @@ public final class CatalystSerializers {
serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
serializer.resolve(new AtomixDocumentTreeCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.resolve(new AtomixConsistentTreeMapCommands.TypeResolver());
serializer.resolve(new AtomixConsistentMultimapCommands.TypeResolver());
......@@ -104,6 +122,7 @@ public final class CatalystSerializers {
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
.registerClassLoader(AtomixLeaderElectorFactory.class)
.registerClassLoader(AtomixWorkQueueFactory.class)
.registerClassLoader(AtomixDocumentTreeFactory.class)
.registerClassLoader(AtomixConsistentTreeMapFactory.class)
.registerClassLoader(AtomixConsistentSetMultimapFactory.class);
......
......@@ -15,14 +15,15 @@
*/
package org.onosproject.store.primitives.resources.impl;
import static com.google.common.base.Preconditions.checkState;
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
......@@ -68,15 +69,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import static com.google.common.base.Preconditions.checkState;
/**
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.onlab.util.Match;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Versioned;
import com.google.common.util.concurrent.MoreExecutors;
/**
* Distributed resource providing the {@link AsyncDocumentTree} primitive.
*/
@ResourceTypeInfo(id = -156, factory = AtomixDocumentTreeFactory.class)
public class AtomixDocumentTree extends AbstractResource<AtomixDocumentTree>
implements AsyncDocumentTree<byte[]> {
private final Map<DocumentTreeListener<byte[]>, Executor> eventListeners = new HashMap<>();
public static final String CHANGE_SUBJECT = "changeEvents";
protected AtomixDocumentTree(CopycatClient client, Properties options) {
super(client, options);
}
@Override
public CompletableFuture<AtomixDocumentTree> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
if (state == CopycatClient.State.CONNECTED && isListening()) {
client.submit(new Listen());
}
});
client.onEvent(CHANGE_SUBJECT, this::processTreeUpdates);
return result;
});
}
@Override
public String name() {
return null;
}
@Override
public Type primitiveType() {
return Type.DOCUMENT_TREE;
}
@Override
public CompletableFuture<Void> destroy() {
return client.submit(new Clear());
}
@Override
public DocumentPath root() {
return DocumentPath.from("root");
}
@Override
public CompletableFuture<Map<String, Versioned<byte[]>>> getChildren(DocumentPath path) {
return client.submit(new GetChildren(checkNotNull(path)));
}
@Override
public CompletableFuture<Versioned<byte[]>> get(DocumentPath path) {
return client.submit(new Get(checkNotNull(path)));
}
@Override
public CompletableFuture<Versioned<byte[]>> set(DocumentPath path, byte[] value) {
return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.any(), Match.any()))
.thenCompose(result -> {
if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
} else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
return Tools.exceptionalFuture(new IllegalDocumentModificationException());
} else {
return CompletableFuture.completedFuture(result);
}
}).thenApply(result -> result.oldValue());
}
@Override
public CompletableFuture<Boolean> create(DocumentPath path, byte[] value) {
return client.submit(new Update(checkNotNull(path), checkNotNull(value), Match.ifNull(), Match.any()))
.thenCompose(result -> {
if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
} else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
return Tools.exceptionalFuture(new IllegalDocumentModificationException());
} else {
return CompletableFuture.completedFuture(result);
}
}).thenApply(result -> result.created());
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, long version) {
return client.submit(new Update(checkNotNull(path), newValue, Match.any(), Match.ifValue(version)))
.thenApply(result -> result.updated());
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, byte[] newValue, byte[] currentValue) {
return client.submit(new Update(checkNotNull(path), newValue, Match.ifValue(currentValue), Match.any()))
.thenCompose(result -> {
if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
} else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
return Tools.exceptionalFuture(new IllegalDocumentModificationException());
} else {
return CompletableFuture.completedFuture(result);
}
}).thenApply(result -> result.updated());
}
@Override
public CompletableFuture<Versioned<byte[]>> removeNode(DocumentPath path) {
if (path.equals(DocumentPath.from("root"))) {
return Tools.exceptionalFuture(new IllegalDocumentModificationException());
}
return client.submit(new Update(checkNotNull(path), null, Match.ifNotNull(), Match.any()))
.thenCompose(result -> {
if (result.status() == DocumentTreeUpdateResult.Status.INVALID_PATH) {
return Tools.exceptionalFuture(new NoSuchDocumentPathException());
} else if (result.status() == DocumentTreeUpdateResult.Status.ILLEGAL_MODIFICATION) {
return Tools.exceptionalFuture(new IllegalDocumentModificationException());
} else {
return CompletableFuture.completedFuture(result);
}
}).thenApply(result -> result.oldValue());
}
@Override
public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<byte[]> listener) {
checkNotNull(path);
checkNotNull(listener);
// TODO: Support API that takes an executor
if (isListening()) {
eventListeners.putIfAbsent(listener, MoreExecutors.directExecutor());
return CompletableFuture.completedFuture(null);
} else {
return client.submit(new Listen(path))
.thenRun(() -> eventListeners.put(listener, MoreExecutors.directExecutor()));
}
}
@Override
public CompletableFuture<Void> removeListener(DocumentTreeListener<byte[]> listener) {
checkNotNull(listener);
if (eventListeners.remove(listener) != null && eventListeners.isEmpty()) {
return client.submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
private boolean isListening() {
return !eventListeners.isEmpty();
}
private void processTreeUpdates(List<DocumentTreeEvent<byte[]>> events) {
events.forEach(event ->
eventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.copycat.Command;
import io.atomix.copycat.Query;
import java.util.Map;
import org.onlab.util.Match;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
/**
* {@link AtomixDocumentTree} resource state machine operations.
*/
public class AtomixDocumentTreeCommands {
/**
* Abstract DocumentTree operation.
*/
public abstract static class DocumentTreeOperation<V> implements CatalystSerializable {
private DocumentPath path;
DocumentTreeOperation(DocumentPath path) {
this.path = path;
}
public DocumentPath path() {
return path;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
serializer.writeObject(path, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
path = serializer.readObject(buffer);
}
}
/**
* Abstract DocumentTree query.
*/
@SuppressWarnings("serial")
public abstract static class DocumentTreeQuery<V> extends DocumentTreeOperation<V> implements Query<V> {
DocumentTreeQuery(DocumentPath path) {
super(path);
}
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.SEQUENTIAL;
}
}
/**
* Abstract DocumentTree command.
*/
@SuppressWarnings("serial")
public abstract static class DocumentTreeCommand<V> extends DocumentTreeOperation<V> implements Command<V> {
DocumentTreeCommand(DocumentPath path) {
super(path);
}
}
/**
* DocumentTree#get query.
*/
@SuppressWarnings("serial")
public static class Get extends DocumentTreeQuery<Versioned<byte[]>> {
public Get() {
super(null);
}
public Get(DocumentPath path) {
super(path);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("path", path())
.toString();
}
}
/**
* DocumentTree#getChildren query.
*/
@SuppressWarnings("serial")
public static class GetChildren extends DocumentTreeQuery<Map<String, Versioned<byte[]>>> {
public GetChildren() {
super(null);
}
public GetChildren(DocumentPath path) {
super(path);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("path", path())
.toString();
}
}
/**
* DocumentTree update command.
*/
@SuppressWarnings("serial")
public static class Update extends DocumentTreeCommand<DocumentTreeUpdateResult<byte[]>> {
private byte[] value;
private Match<byte[]> valueMatch;
private Match<Long> versionMatch;
public Update() {
super(null);
this.value = null;
this.valueMatch = null;
this.versionMatch = null;
}
public Update(DocumentPath path, byte[] value, Match<byte[]> valueMatch, Match<Long> versionMatch) {
super(path);
this.value = value;
this.valueMatch = valueMatch;
this.versionMatch = versionMatch;
}
public byte[] value() {
return value;
}
public Match<byte[]> valueMatch() {
return valueMatch;
}
public Match<Long> versionMatch() {
return versionMatch;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(value, buffer);
serializer.writeObject(valueMatch, buffer);
serializer.writeObject(versionMatch, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
value = serializer.readObject(buffer);
valueMatch = serializer.readObject(buffer);
versionMatch = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("path", path())
.add("value", value)
.add("valueMatch", valueMatch)
.add("versionMatch", versionMatch)
.toString();
}
}
/**
* Clear command.
*/
@SuppressWarnings("serial")
public static class Clear implements Command<Void>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* Change listen.
*/
@SuppressWarnings("serial")
public static class Listen extends DocumentTreeCommand<Void> {
public Listen() {
this(DocumentPath.from("root"));
}
public Listen(DocumentPath path) {
super(path);
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* Change unlisten.
*/
@SuppressWarnings("serial")
public static class Unlisten extends DocumentTreeCommand<Void> {
public Unlisten() {
this(DocumentPath.from("root"));
}
public Unlisten(DocumentPath path) {
super(path);
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* DocumentTree command type resolver.
*/
public static class TypeResolver implements SerializableTypeResolver {
@Override
public void resolve(SerializerRegistry registry) {
registry.register(Get.class, -911);
registry.register(GetChildren.class, -912);
registry.register(Update.class, -913);
registry.register(Listen.class, -914);
registry.register(Unlisten.class, -915);
registry.register(Clear.class, -916);
}
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory;
import io.atomix.resource.ResourceStateMachine;
import java.util.Properties;
/**
* {@link AtomixDocumentTree} resource factory.
*
*/
public class AtomixDocumentTreeFactory implements ResourceFactory<AtomixDocumentTree> {
@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new AtomixDocumentTreeCommands.TypeResolver();
}
@Override
public ResourceStateMachine createStateMachine(Properties config) {
return new AtomixDocumentTreeState(config);
}
@Override
public AtomixDocumentTree createInstance(CopycatClient client, Properties options) {
return new AtomixDocumentTree(client, options);
}
}
\ No newline at end of file
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.onlab.util.Match;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Get;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.GetChildren;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTreeCommands.Update;
import org.onosproject.store.primitives.resources.impl.DocumentTreeUpdateResult.Status;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTree;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeEvent.Type;
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
/**
* State Machine for {@link AtomixDocumentTree} resource.
*/
public class AtomixDocumentTreeState
extends ResourceStateMachine
implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final AtomicLong versionCounter = new AtomicLong(0);
private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
public AtomixDocumentTreeState(Properties properties) {
super(properties);
}
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(versionCounter.get());
}
@Override
public void install(SnapshotReader reader) {
versionCounter.set(reader.readLong());
}
@Override
protected void configure(StateMachineExecutor executor) {
// Listeners
executor.register(Listen.class, this::listen);
executor.register(Unlisten.class, this::unlisten);
// queries
executor.register(Get.class, this::get);
executor.register(GetChildren.class, this::getChildren);
// commands
executor.register(Update.class, this::update);
executor.register(Clear.class, this::clear);
}
protected void listen(Commit<? extends Listen> commit) {
Long sessionId = commit.session().id();
if (listeners.putIfAbsent(sessionId, commit) != null) {
commit.close();
return;
}
commit.session()
.onStateChange(
state -> {
if (state == ServerSession.State.CLOSED
|| state == ServerSession.State.EXPIRED) {
Commit<? extends Listen> listener = listeners.remove(sessionId);
if (listener != null) {
listener.close();
}
}
});
}
protected void unlisten(Commit<? extends Unlisten> commit) {
try {
closeListener(commit.session().id());
} finally {
commit.close();
}
}
protected Versioned<byte[]> get(Commit<? extends Get> commit) {
try {
Versioned<TreeNodeValue> value = docTree.get(commit.operation().path());
return value == null ? null : value.map(node -> node == null ? null : node.value());
} finally {
commit.close();
}
}
protected Map<String, Versioned<byte[]>> getChildren(Commit<? extends GetChildren> commit) {
try {
Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(commit.operation().path());
return children == null
? null : Maps.newHashMap(Maps.transformValues(children,
value -> value.map(TreeNodeValue::value)));
} finally {
commit.close();
}
}
protected DocumentTreeUpdateResult<byte[]> update(Commit<? extends Update> commit) {
DocumentTreeUpdateResult<byte[]> result = null;
DocumentPath path = commit.operation().path();
boolean updated = false;
Versioned<TreeNodeValue> currentValue = docTree.get(path);
try {
Match<Long> versionMatch = commit.operation().versionMatch();
Match<byte[]> valueMatch = commit.operation().valueMatch();
if (versionMatch.matches(currentValue == null ? null : currentValue.version())
&& valueMatch.matches(currentValue == null ? null : currentValue.value().value())) {
if (commit.operation().value() == null) {
docTree.removeNode(path);
} else {
docTree.set(path, new NonTransactionalCommit(commit));
}
updated = true;
}
Versioned<TreeNodeValue> newValue = updated ? docTree.get(path) : currentValue;
Status updateStatus = updated
? Status.OK : commit.operation().value() == null ? Status.INVALID_PATH : Status.NOOP;
result = new DocumentTreeUpdateResult<>(path,
updateStatus,
newValue == null
? null : newValue.map(TreeNodeValue::value),
currentValue == null
? null : currentValue.map(TreeNodeValue::value));
} catch (IllegalDocumentModificationException e) {
result = DocumentTreeUpdateResult.illegalModification(path);
} catch (NoSuchDocumentPathException e) {
result = DocumentTreeUpdateResult.invalidPath(path);
} catch (Exception e) {
log.error("Failed to apply {} to state machine", commit.operation(), e);
throw Throwables.propagate(e);
} finally {
if (updated) {
if (currentValue != null) {
currentValue.value().discard();
}
} else {
commit.close();
}
}
notifyListeners(path, result);
return result;
}
protected void clear(Commit<? extends Clear> commit) {
try {
Queue<DocumentPath> toClearQueue = Queues.newArrayDeque();
Map<String, Versioned<TreeNodeValue>> topLevelChildren = docTree.getChildren(DocumentPath.from("root"));
toClearQueue.addAll(topLevelChildren.keySet()
.stream()
.map(name -> new DocumentPath(name, DocumentPath.from("root")))
.collect(Collectors.toList()));
while (!toClearQueue.isEmpty()) {
DocumentPath path = toClearQueue.remove();
Map<String, Versioned<TreeNodeValue>> children = docTree.getChildren(path);
if (children.size() == 0) {
docTree.removeNode(path).value().discard();
} else {
children.keySet()
.stream()
.forEach(name -> toClearQueue.add(new DocumentPath(name, path)));
toClearQueue.add(path);
}
}
} finally {
commit.close();
}
}
/**
* Interface implemented by tree node values.
*/
private interface TreeNodeValue {
/**
* Returns the raw {@code byte[]}.
*
* @return raw value
*/
byte[] value();
/**
* Discards the value by invoke appropriate clean up actions.
*/
void discard();
}
/**
* A {@code TreeNodeValue} that is derived from a non-transactional update
* i.e. via any standard tree update operation.
*/
private class NonTransactionalCommit implements TreeNodeValue {
private final Commit<? extends Update> commit;
public NonTransactionalCommit(Commit<? extends Update> commit) {
this.commit = commit;
}
@Override
public byte[] value() {
return commit.operation().value();
}
@Override
public void discard() {
commit.close();
}
}
private void notifyListeners(DocumentPath path, DocumentTreeUpdateResult<byte[]> result) {
if (result.status() != Status.OK) {
return;
}
DocumentTreeEvent<byte[]> event =
new DocumentTreeEvent<>(path,
result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
Optional.ofNullable(result.newValue()),
Optional.ofNullable(result.oldValue()));
Object message = ImmutableList.of(event);
listeners.values().forEach(commit -> {
commit.session().publish(AtomixDocumentTree.CHANGE_SUBJECT, message);
System.out.println("Sent " + message + " to " + commit.session().id());
});
}
@Override
public void register(ServerSession session) {
}
@Override
public void unregister(ServerSession session) {
closeListener(session.id());
}
@Override
public void expire(ServerSession session) {
closeListener(session.id());
}
@Override
public void close(ServerSession session) {
closeListener(session.id());
}
private void closeListener(Long sessionId) {
Commit<? extends Listen> commit = listeners.remove(sessionId);
if (commit != null) {
commit.close();
}
}
}
......@@ -19,7 +19,7 @@ package org.onosproject.store.primitives.resources.impl;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTree;
......@@ -30,6 +30,7 @@ import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
/**
......@@ -41,10 +42,17 @@ public class DefaultDocumentTree<V> implements DocumentTree<V> {
private static final DocumentPath ROOT_PATH = DocumentPath.from("root");
private final DefaultDocumentTreeNode<V> root;
private final AtomicInteger versionCounter = new AtomicInteger(0);
private final Supplier<Long> versionSupplier;
public DefaultDocumentTree() {
root = new DefaultDocumentTreeNode<V>(ROOT_PATH, null, nextVersion(), null);
AtomicLong versionCounter = new AtomicLong(0);
versionSupplier = versionCounter::incrementAndGet;
root = new DefaultDocumentTreeNode<V>(ROOT_PATH, null, versionSupplier.get(), null);
}
public DefaultDocumentTree(Supplier<Long> versionSupplier) {
root = new DefaultDocumentTreeNode<V>(ROOT_PATH, null, versionSupplier.get(), null);
this.versionSupplier = versionSupplier;
}
@Override
......@@ -74,7 +82,7 @@ public class DefaultDocumentTree<V> implements DocumentTree<V> {
checkRootModification(path);
DefaultDocumentTreeNode<V> node = getNode(path);
if (node != null) {
return node.update(value, nextVersion());
return node.update(value, versionSupplier.get());
} else {
create(path, value);
return null;
......@@ -93,7 +101,7 @@ public class DefaultDocumentTree<V> implements DocumentTree<V> {
if (parentNode == null) {
throw new IllegalDocumentModificationException();
}
parentNode.addChild(simpleName(path), value, nextVersion());
parentNode.addChild(simpleName(path), value, versionSupplier.get());
return true;
}
......@@ -159,10 +167,6 @@ public class DefaultDocumentTree<V> implements DocumentTree<V> {
return currentNode;
}
private long nextVersion() {
return versionCounter.incrementAndGet();
}
private String simpleName(DocumentPath path) {
return path.pathElements().get(path.pathElements().size() - 1);
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
/**
* Result of a document tree node update operation.
* <p>
* Both old and new values are accessible along with a status of update.
*
* @param <V> value type
*/
public class DocumentTreeUpdateResult<V> {
public enum Status {
/**
* Indicates a successful update.
*/
OK,
/**
* Indicates a noop i.e. existing and new value are both same.
*/
NOOP,
/**
* Indicates a failed update due to a write lock.
*/
WRITE_LOCK,
/**
* Indicates a failed update due to a invalid path.
*/
INVALID_PATH,
/**
* Indicates a failed update due to a illegal modification attempt.
*/
ILLEGAL_MODIFICATION,
}
private final DocumentPath path;
private final Status status;
private final Versioned<V> oldValue;
private final Versioned<V> newValue;
public DocumentTreeUpdateResult(DocumentPath path,
Status status,
Versioned<V> newValue,
Versioned<V> oldValue) {
this.status = status;
this.path = path;
this.newValue = newValue;
this.oldValue = oldValue;
}
public static <V> DocumentTreeUpdateResult<V> invalidPath(DocumentPath path) {
return new DocumentTreeUpdateResult<>(path, Status.INVALID_PATH, null, null);
}
public static <V> DocumentTreeUpdateResult<V> illegalModification(DocumentPath path) {
return new DocumentTreeUpdateResult<>(path, Status.ILLEGAL_MODIFICATION, null, null);
}
public Status status() {
return status;
}
public DocumentPath path() {
return path;
}
public Versioned<V> oldValue() {
return oldValue;
}
public Versioned<V> newValue() {
return this.newValue;
}
public boolean updated() {
return status == Status.OK;
}
public boolean created() {
return updated() && oldValue == null;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("path", path)
.add("status", status)
.add("newValue", newValue)
.add("oldValue", oldValue)
.toString();
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.atomix.resource.ResourceType;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.IllegalDocumentModificationException;
import org.onosproject.store.service.NoSuchDocumentPathException;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
/**
* Unit tests for {@link AtomixDocumentTree}.
*/
public class AtomixDocumentTreeTest extends AtomixTestBase {
@BeforeClass
public static void preTestSetup() throws Throwable {
createCopycatServers(3);
}
@AfterClass
public static void postTestCleanup() throws Exception {
clearTests();
}
@Override
protected ResourceType resourceType() {
return new ResourceType(AtomixDocumentTree.class);
}
/**
* Tests queries (get and getChildren).
*/
@Test
public void testQueries() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
Versioned<byte[]> root = tree.get(DocumentPath.from("root")).join();
assertEquals(1, root.version());
assertNull(root.value());
}
/**
* Tests create.
*/
@Test
public void testCreate() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
Versioned<byte[]> a = tree.get(DocumentPath.from("root.a")).join();
assertArrayEquals("a".getBytes(), a.value());
Versioned<byte[]> ab = tree.get(DocumentPath.from("root.a.b")).join();
assertArrayEquals("ab".getBytes(), ab.value());
Versioned<byte[]> ac = tree.get(DocumentPath.from("root.a.c")).join();
assertArrayEquals("ac".getBytes(), ac.value());
}
/**
* Tests set.
*/
@Test
public void testSet() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
tree.set(DocumentPath.from("root.a.d"), "ad".getBytes()).join();
Versioned<byte[]> ad = tree.get(DocumentPath.from("root.a.d")).join();
assertArrayEquals("ad".getBytes(), ad.value());
tree.set(DocumentPath.from("root.a"), "newA".getBytes()).join();
Versioned<byte[]> newA = tree.get(DocumentPath.from("root.a")).join();
assertArrayEquals("newA".getBytes(), newA.value());
tree.set(DocumentPath.from("root.a.b"), "newAB".getBytes()).join();
Versioned<byte[]> newAB = tree.get(DocumentPath.from("root.a.b")).join();
assertArrayEquals("newAB".getBytes(), newAB.value());
}
/**
* Tests replace if version matches.
*/
@Test
public void testReplaceVersion() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
Versioned<byte[]> ab = tree.get(DocumentPath.from("root.a.b")).join();
assertTrue(tree.replace(DocumentPath.from("root.a.b"), "newAB".getBytes(), ab.version()).join());
Versioned<byte[]> newAB = tree.get(DocumentPath.from("root.a.b")).join();
assertArrayEquals("newAB".getBytes(), newAB.value());
assertFalse(tree.replace(DocumentPath.from("root.a.b"), "newestAB".getBytes(), ab.version()).join());
assertArrayEquals("newAB".getBytes(), tree.get(DocumentPath.from("root.a.b")).join().value());
assertFalse(tree.replace(DocumentPath.from("root.a.d"), "foo".getBytes(), 1).join());
}
/**
* Tests replace if value matches.
*/
@Test
public void testReplaceValue() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
Versioned<byte[]> ab = tree.get(DocumentPath.from("root.a.b")).join();
assertTrue(tree.replace(DocumentPath.from("root.a.b"), "newAB".getBytes(), ab.value()).join());
Versioned<byte[]> newAB = tree.get(DocumentPath.from("root.a.b")).join();
assertArrayEquals("newAB".getBytes(), newAB.value());
assertFalse(tree.replace(DocumentPath.from("root.a.b"), "newestAB".getBytes(), ab.value()).join());
assertArrayEquals("newAB".getBytes(), tree.get(DocumentPath.from("root.a.b")).join().value());
assertFalse(tree.replace(DocumentPath.from("root.a.d"), "bar".getBytes(), "foo".getBytes()).join());
}
/**
* Tests remove.
*/
@Test
public void testRemove() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
Versioned<byte[]> ab = tree.removeNode(DocumentPath.from("root.a.b")).join();
assertArrayEquals("ab".getBytes(), ab.value());
assertNull(tree.get(DocumentPath.from("root.a.b")).join());
Versioned<byte[]> ac = tree.removeNode(DocumentPath.from("root.a.c")).join();
assertArrayEquals("ac".getBytes(), ac.value());
assertNull(tree.get(DocumentPath.from("root.a.c")).join());
Versioned<byte[]> a = tree.removeNode(DocumentPath.from("root.a")).join();
assertArrayEquals("a".getBytes(), a.value());
assertNull(tree.get(DocumentPath.from("root.a")).join());
}
/**
* Tests invalid removes.
*/
@Test
public void testRemoveFailures() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
try {
tree.removeNode(DocumentPath.from("root")).join();
fail();
} catch (Exception e) {
assertTrue(Throwables.getRootCause(e) instanceof IllegalDocumentModificationException);
}
try {
tree.removeNode(DocumentPath.from("root.a")).join();
fail();
} catch (Exception e) {
assertTrue(Throwables.getRootCause(e) instanceof IllegalDocumentModificationException);
}
try {
tree.removeNode(DocumentPath.from("root.d")).join();
fail();
} catch (Exception e) {
assertTrue(Throwables.getRootCause(e) instanceof NoSuchDocumentPathException);
}
}
/**
* Tests invalid create.
*/
@Test
public void testCreateFailures() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
try {
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
fail();
} catch (Exception e) {
assertTrue(Throwables.getRootCause(e) instanceof IllegalDocumentModificationException);
}
}
/**
* Tests invalid set.
*/
@Test
public void testSetFailures() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
try {
tree.set(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
fail();
} catch (Exception e) {
assertTrue(Throwables.getRootCause(e) instanceof IllegalDocumentModificationException);
}
}
/**
* Tests getChildren.
*/
@Test
public void testGetChildren() throws Throwable {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
Map<String, Versioned<byte[]>> rootChildren = tree.getChildren(DocumentPath.from("root")).join();
assertEquals(1, rootChildren.size());
Versioned<byte[]> a = rootChildren.get("a");
assertArrayEquals("a".getBytes(), a.value());
Map<String, Versioned<byte[]>> children = tree.getChildren(DocumentPath.from("root.a")).join();
assertEquals(2, children.size());
Versioned<byte[]> ab = children.get("b");
assertArrayEquals("ab".getBytes(), ab.value());
Versioned<byte[]> ac = children.get("c");
assertArrayEquals("ac".getBytes(), ac.value());
assertEquals(0, tree.getChildren(DocumentPath.from("root.a.b")).join().size());
assertEquals(0, tree.getChildren(DocumentPath.from("root.a.c")).join().size());
}
/**
* Tests destroy.
*/
@Test
public void testClear() {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
tree.create(DocumentPath.from("root.a"), "a".getBytes()).join();
tree.create(DocumentPath.from("root.a.b"), "ab".getBytes()).join();
tree.create(DocumentPath.from("root.a.c"), "ac".getBytes()).join();
tree.destroy().join();
assertEquals(0, tree.getChildren(DocumentPath.from("root")).join().size());
}
/**
* Tests listeners.
*/
@Test
@Ignore
public void testNotifications() throws Exception {
AtomixDocumentTree tree = createAtomixClient().getResource(UUID.randomUUID().toString(),
AtomixDocumentTree.class).join();
TestEventListener listener = new TestEventListener();
// add listener; create a node in the tree and verify an CREATED event is received.
tree.addListener(listener).thenCompose(v -> tree.set(DocumentPath.from("root.a"), "a".getBytes())).join();
DocumentTreeEvent<byte[]> event = listener.event();
assertNotNull(event);
assertEquals(DocumentTreeEvent.Type.CREATED, event.type());
assertArrayEquals("a".getBytes(), event.newValue().get().value());
}
private static class TestEventListener implements DocumentTreeListener<byte[]> {
private final BlockingQueue<DocumentTreeEvent<byte[]>> queue = new ArrayBlockingQueue<>(1);
@Override
public void event(DocumentTreeEvent<byte[]> event) {
try {
queue.put(event);
} catch (InterruptedException e) {
Throwables.propagate(e);
}
}
public boolean eventReceived() {
return !queue.isEmpty();
}
public DocumentTreeEvent<byte[]> event() throws InterruptedException {
return queue.take();
}
}
}