Madan Jampani
Committed by Gerrit Code Review

State machine implementations for various distributed primitives based on latest Copycat APIs

Change-Id: I622cc196aa1cdf072a5a0b100a5ffaaf71b07900
Showing 21 changed files with 4366 additions and 0 deletions
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.copycat.client.Query;
import io.atomix.manager.state.GetResource;
import io.atomix.manager.state.GetResourceKeys;
import io.atomix.resource.ResourceQuery;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Scanner;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
import org.onosproject.store.primitives.resources.impl.TransactionId;
import org.onosproject.store.primitives.resources.impl.TransactionalMapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
/**
* Serializer utility for Atomix Catalyst.
*/
public final class CatalystSerializers {
private CatalystSerializers() {
}
public static Serializer getSerializer() {
Serializer serializer = new Serializer();
TypeSerializerFactory factory =
new DefaultCatalystTypeSerializerFactory(
org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
MapEntryUpdateResult.class,
MapEntryUpdateResult.Status.class,
MapUpdate.class,
MapUpdate.Type.class,
TransactionalMapUpdate.class,
TransactionId.class,
PrepareResult.class,
CommitResult.class,
RollbackResult.class,
AtomixConsistentMapCommands.Get.class,
AtomixConsistentMapCommands.ContainsKey.class,
AtomixConsistentMapCommands.ContainsValue.class,
AtomixConsistentMapCommands.Size.class,
AtomixConsistentMapCommands.IsEmpty.class,
AtomixConsistentMapCommands.KeySet.class,
AtomixConsistentMapCommands.EntrySet.class,
AtomixConsistentMapCommands.Values.class,
AtomixConsistentMapCommands.UpdateAndGet.class,
AtomixConsistentMapCommands.TransactionPrepare.class,
AtomixConsistentMapCommands.TransactionCommit.class,
AtomixConsistentMapCommands.TransactionRollback.class,
AtomixLeaderElectorCommands.GetLeadership.class,
AtomixLeaderElectorCommands.GetAllLeaderships.class,
AtomixLeaderElectorCommands.GetElectedTopics.class,
AtomixLeaderElectorCommands.Run.class,
AtomixLeaderElectorCommands.Withdraw.class,
AtomixLeaderElectorCommands.Anoint.class,
GetResource.class,
GetResourceKeys.class,
ResourceQuery.class,
Query.ConsistencyLevel.class));
// ONOS classes
serializer.register(Change.class, factory);
serializer.register(NodeId.class, factory);
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
serializer.register(TransactionalMapUpdate.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
serializer.register(RollbackResult.class, factory);
serializer.register(TransactionId.class, factory);
serializer.register(MapUpdate.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.register(AtomixConsistentMapState.class, factory);
serializer.register(ResourceQuery.class, factory);
serializer.register(GetResource.class, factory);
serializer.register(GetResourceKeys.class, factory);
// ConsistentMap
serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
serializer.register(AtomixConsistentMapCommands.Clear.class);
serializer.register(AtomixConsistentMapCommands.Listen.class);
serializer.register(AtomixConsistentMapCommands.Unlisten.class);
serializer.register(AtomixConsistentMapCommands.Get.class);
serializer.register(AtomixConsistentMapCommands.ContainsKey.class);
serializer.register(AtomixConsistentMapCommands.ContainsValue.class);
serializer.register(AtomixConsistentMapCommands.EntrySet.class);
serializer.register(AtomixConsistentMapCommands.IsEmpty.class);
serializer.register(AtomixConsistentMapCommands.KeySet.class);
serializer.register(AtomixConsistentMapCommands.Size.class);
serializer.register(AtomixConsistentMapCommands.Values.class);
serializer.register(AtomixConsistentMapCommands.TransactionPrepare.class);
serializer.register(AtomixConsistentMapCommands.TransactionCommit.class);
serializer.register(AtomixConsistentMapCommands.TransactionRollback.class);
// LeaderElector
serializer.register(AtomixLeaderElectorCommands.Run.class, factory);
serializer.register(AtomixLeaderElectorCommands.Withdraw.class, factory);
serializer.register(AtomixLeaderElectorCommands.Anoint.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetLeadership.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, factory);
serializer.register(AtomixLeaderElectorCommands.Listen.class);
serializer.register(AtomixLeaderElectorCommands.Unlisten.class);
// Atomix types
try {
ClassLoader cl = CatalystSerializable.class.getClassLoader();
Enumeration<URL> urls = cl.getResources(
String.format("META-INF/services/%s", CatalystSerializable.class.getName()));
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
try (Scanner scanner = new Scanner(url.openStream(), "UTF-8")) {
scanner.useDelimiter("\n").forEachRemaining(line -> {
if (!line.trim().startsWith("#")) {
line = line.trim();
if (line.length() > 0) {
try {
serializer.register(cl.loadClass(line));
} catch (ClassNotFoundException e) {
Throwables.propagate(e);
}
}
}
});
}
}
} catch (IOException e) {
Throwables.propagate(e);
}
return serializer;
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Consistency;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.onlab.util.Match;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Sets;
/**
* Distributed resource providing the {@link AsyncConsistentMap} primitive.
*/
@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
private static final String CHANGE_SUBJECT = "change";
public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
super(client, options);
}
@Override
public String name() {
return null;
}
@Override
public CompletableFuture<AtomixConsistentMap> open() {
return super.open().thenApply(result -> {
client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
private void handleEvent(MapEvent<String, byte[]> event) {
mapEventListeners.forEach(listener -> listener.event(event));
}
@Override
public AtomixConsistentMap with(Consistency consistency) {
super.with(consistency);
return this;
}
@Override
public CompletableFuture<Boolean> isEmpty() {
return submit(new AtomixConsistentMapCommands.IsEmpty());
}
@Override
public CompletableFuture<Integer> size() {
return submit(new AtomixConsistentMapCommands.Size());
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
return submit(new AtomixConsistentMapCommands.ContainsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
return submit(new AtomixConsistentMapCommands.ContainsValue(value));
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String key) {
return submit(new AtomixConsistentMapCommands.Get(key));
}
@Override
public CompletableFuture<Set<String>> keySet() {
return submit(new AtomixConsistentMapCommands.KeySet());
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values() {
return submit(new AtomixConsistentMapCommands.Values());
}
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
return submit(new AtomixConsistentMapCommands.EntrySet());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.newValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
newValue,
Match.ifValue(oldValue),
Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
newValue,
Match.ANY,
Match.ifValue(oldVersion)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
return submit(new AtomixConsistentMapCommands.Clear())
.whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> null);
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> computeIf(String key,
Predicate<? super byte[]> condition,
BiFunction<? super String, ? super byte[], ? extends byte[]> remappingFunction) {
return get(key).thenCompose(r1 -> {
byte[] existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
if (!condition.test(existingValue)) {
return CompletableFuture.completedFuture(r1);
}
AtomicReference<byte[]> computedValue = new AtomicReference<>();
// if remappingFunction throws an exception, return the exception.
try {
computedValue.set(remappingFunction.apply(key, existingValue));
} catch (Exception e) {
CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
if (computedValue.get() == null && r1 == null) {
return CompletableFuture.completedFuture(null);
}
Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
return submit(new AtomixConsistentMapCommands.UpdateAndGet(key,
computedValue.get(),
valueMatch,
versionMatch))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.newValue());
});
}
public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
}
public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
}
public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
}
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
if (!mapEventListeners.isEmpty()) {
if (mapEventListeners.add(listener)) {
return CompletableFuture.completedFuture(new ChangeListener(listener)).thenApply(v -> null);
} else {
return CompletableFuture.completedFuture(null);
}
}
mapEventListeners.add(listener);
return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
}
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
private void throwIfLocked(MapEntryUpdateResult.Status status) {
if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
}
}
/**
* Change listener context.
*/
private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
private final MapEventListener<String, byte[]> listener;
private ChangeListener(MapEventListener<String, byte[]> listener) {
this.listener = listener;
}
@Override
public void accept(MapEvent<String, byte[]> event) {
listener.event(event);
}
@Override
public void close() {
synchronized (AtomixConsistentMap.this) {
mapEventListeners.remove(listener);
if (mapEventListeners.isEmpty()) {
submit(new AtomixConsistentMapCommands.Unlisten());
}
}
}
}
}
\ No newline at end of file
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
/**
* {@link AtomixConsistentMap} resource state machine operations.
*/
public final class AtomixConsistentMapCommands {
private AtomixConsistentMapCommands() {
}
/**
* Abstract map command.
*/
@SuppressWarnings("serial")
public abstract static class MapCommand<V> implements Command<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.LINEARIZABLE;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* Abstract map query.
*/
@SuppressWarnings("serial")
public abstract static class MapQuery<V> implements Query<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.BOUNDED_LINEARIZABLE;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* Abstract key-based query.
*/
@SuppressWarnings("serial")
public abstract static class KeyQuery<V> extends MapQuery<V> {
protected String key;
public KeyQuery() {
}
public KeyQuery(String key) {
this.key = Assert.notNull(key, "key");
}
/**
* Returns the key.
* @return key
*/
public String key() {
return key;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
key = serializer.readObject(buffer);
}
}
/**
* Abstract key-based query.
*/
@SuppressWarnings("serial")
public abstract static class ValueQuery<V> extends MapQuery<V> {
protected byte[] value;
public ValueQuery() {
}
public ValueQuery(byte[] value) {
this.value = Assert.notNull(value, "value");
}
/**
* Returns the key.
* @return key
*/
public byte[] value() {
return value;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(value, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
value = serializer.readObject(buffer);
}
}
/**
* Contains key command.
*/
@SuppressWarnings("serial")
public static class ContainsKey extends KeyQuery<Boolean> {
public ContainsKey() {
}
public ContainsKey(String key) {
super(key);
}
}
/**
* Contains key command.
*/
@SuppressWarnings("serial")
public static class ContainsValue extends ValueQuery<Boolean> {
public ContainsValue() {
}
public ContainsValue(byte[] value) {
super(value);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("value", value)
.toString();
}
}
/**
* Map prepare command.
*/
@SuppressWarnings("serial")
public static class TransactionPrepare extends MapCommand<PrepareResult> {
private TransactionalMapUpdate<String, byte[]> update;
public TransactionPrepare() {
}
public TransactionPrepare(TransactionalMapUpdate<String, byte[]> update) {
this.update = update;
}
public TransactionalMapUpdate<String, byte[]> transactionUpdate() {
return update;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(update, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
update = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("update", update)
.toString();
}
}
/**
* Map transaction commit command.
*/
@SuppressWarnings("serial")
public static class TransactionCommit extends MapCommand<CommitResult> {
private TransactionId transactionId;
public TransactionCommit() {
}
public TransactionCommit(TransactionId transactionId) {
this.transactionId = transactionId;
}
/**
* Returns the transaction identifier.
* @return transaction id
*/
public TransactionId transactionId() {
return transactionId;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(transactionId, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
transactionId = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("transactionId", transactionId)
.toString();
}
}
/**
* Map transaction rollback command.
*/
@SuppressWarnings("serial")
public static class TransactionRollback extends MapCommand<RollbackResult> {
private TransactionId transactionId;
public TransactionRollback() {
}
public TransactionRollback(TransactionId transactionId) {
this.transactionId = transactionId;
}
/**
* Returns the transaction identifier.
* @return transaction id
*/
public TransactionId transactionId() {
return transactionId;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(transactionId, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
transactionId = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("transactionId", transactionId)
.toString();
}
}
/**
* Map update command.
*/
@SuppressWarnings("serial")
public static class UpdateAndGet extends MapCommand<MapEntryUpdateResult<String, byte[]>> {
private String key;
private byte[] value;
private Match<byte[]> valueMatch;
private Match<Long> versionMatch;
public UpdateAndGet() {
}
public UpdateAndGet(String key,
byte[] value,
Match<byte[]> valueMatch,
Match<Long> versionMatch) {
this.key = key;
this.value = value;
this.valueMatch = valueMatch;
this.versionMatch = versionMatch;
}
/**
* Returns the key.
* @return key
*/
public String key() {
return this.key;
}
/**
* Returns the value.
* @return value
*/
public byte[] value() {
return this.value;
}
/**
* Returns the value match.
* @return value match
*/
public Match<byte[]> valueMatch() {
return this.valueMatch;
}
/**
* Returns the version match.
* @return version match
*/
public Match<Long> versionMatch() {
return this.versionMatch;
}
@Override
public CompactionMode compaction() {
return value == null ? CompactionMode.FULL : CompactionMode.QUORUM;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
serializer.writeObject(value, buffer);
serializer.writeObject(valueMatch, buffer);
serializer.writeObject(versionMatch, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
key = serializer.readObject(buffer);
value = serializer.readObject(buffer);
valueMatch = serializer.readObject(buffer);
versionMatch = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("value", value)
.add("valueMatch", valueMatch)
.add("versionMatch", versionMatch)
.toString();
}
}
/**
* Get query.
*/
@SuppressWarnings("serial")
public static class Get extends KeyQuery<Versioned<byte[]>> {
public Get() {
}
public Get(String key) {
super(key);
}
}
/**
* Is empty query.
*/
@SuppressWarnings("serial")
public static class IsEmpty extends MapQuery<Boolean> {
}
/**
* KeySet query.
*/
@SuppressWarnings("serial")
public static class KeySet extends MapQuery<Set<String>> {
}
/**
* KeySet query.
*/
@SuppressWarnings("serial")
public static class Values extends MapQuery<Collection<Versioned<byte[]>>> {
}
/**
* KeySet query.
*/
@SuppressWarnings("serial")
public static class EntrySet extends MapQuery<Set<Map.Entry<String, Versioned<byte[]>>>> {
}
/**
* Size query.
*/
@SuppressWarnings("serial")
public static class Size extends MapQuery<Integer> {
}
/**
* Clear command.
*/
@SuppressWarnings("serial")
public static class Clear extends MapCommand<MapEntryUpdateResult.Status> {
@Override
public CompactionMode compaction() {
return CompactionMode.FULL;
}
}
/**
* Change listen.
*/
@SuppressWarnings("serial")
public static class Listen implements Command<Void>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
}
/**
* Change unlisten.
*/
@SuppressWarnings("serial")
public static class Unlisten implements Command<Void>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
import io.atomix.copycat.client.session.Session;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import static com.google.common.base.Preconditions.checkState;
/**
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements
SessionListener, Snapshottable {
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps
.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(versionCounter.get());
}
@Override
public void install(SnapshotReader reader) {
versionCounter = new AtomicLong(reader.readLong());
}
@Override
protected void configure(StateMachineExecutor executor) {
// Listeners
executor.register(AtomixConsistentMapCommands.Listen.class,
this::listen);
executor.register(AtomixConsistentMapCommands.Unlisten.class,
this::unlisten);
// Queries
executor.register(AtomixConsistentMapCommands.ContainsKey.class,
this::containsKey);
executor.register(AtomixConsistentMapCommands.ContainsValue.class,
this::containsValue);
executor.register(AtomixConsistentMapCommands.EntrySet.class,
this::entrySet);
executor.register(AtomixConsistentMapCommands.Get.class, this::get);
executor.register(AtomixConsistentMapCommands.IsEmpty.class,
this::isEmpty);
executor.register(AtomixConsistentMapCommands.KeySet.class,
this::keySet);
executor.register(AtomixConsistentMapCommands.Size.class, this::size);
executor.register(AtomixConsistentMapCommands.Values.class,
this::values);
// Commands
executor.register(AtomixConsistentMapCommands.UpdateAndGet.class,
this::updateAndGet);
executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
executor.register(AtomixConsistentMapCommands.TransactionPrepare.class,
this::prepare);
executor.register(AtomixConsistentMapCommands.TransactionCommit.class,
this::commit);
executor.register(
AtomixConsistentMapCommands.TransactionRollback.class,
this::rollback);
}
@Override
public void delete() {
// Delete Listeners
listeners.values().forEach(Commit::close);
listeners.clear();
// Delete Map entries
mapEntries.values().forEach(MapEntryValue::discard);
mapEntries.clear();
}
/**
* Handles a contains key commit.
*
* @param commit
* containsKey commit
* @return {@code true} if map contains key
*/
protected boolean containsKey(
Commit<? extends AtomixConsistentMapCommands.ContainsKey> commit) {
try {
return toVersioned(mapEntries.get(commit.operation().key())) != null;
} finally {
commit.close();
}
}
/**
* Handles a contains value commit.
*
* @param commit
* containsValue commit
* @return {@code true} if map contains value
*/
protected boolean containsValue(
Commit<? extends AtomixConsistentMapCommands.ContainsValue> commit) {
try {
Match<byte[]> valueMatch = Match
.ifValue(commit.operation().value());
return mapEntries.values().stream()
.anyMatch(value -> valueMatch.matches(value.value()));
} finally {
commit.close();
}
}
/**
* Handles a get commit.
*
* @param commit
* get commit
* @return value mapped to key
*/
protected Versioned<byte[]> get(
Commit<? extends AtomixConsistentMapCommands.Get> commit) {
try {
return toVersioned(mapEntries.get(commit.operation().key()));
} finally {
commit.close();
}
}
/**
* Handles a count commit.
*
* @param commit
* size commit
* @return number of entries in map
*/
protected int size(Commit<? extends AtomixConsistentMapCommands.Size> commit) {
try {
return mapEntries.size();
} finally {
commit.close();
}
}
/**
* Handles an is empty commit.
*
* @param commit
* isEmpty commit
* @return {@code true} if map is empty
*/
protected boolean isEmpty(
Commit<? extends AtomixConsistentMapCommands.IsEmpty> commit) {
try {
return mapEntries.isEmpty();
} finally {
commit.close();
}
}
/**
* Handles a keySet commit.
*
* @param commit
* keySet commit
* @return set of keys in map
*/
protected Set<String> keySet(
Commit<? extends AtomixConsistentMapCommands.KeySet> commit) {
try {
return mapEntries.keySet();
} finally {
commit.close();
}
}
/**
* Handles a values commit.
*
* @param commit
* values commit
* @return collection of values in map
*/
protected Collection<Versioned<byte[]>> values(
Commit<? extends AtomixConsistentMapCommands.Values> commit) {
try {
return mapEntries.values().stream().map(this::toVersioned)
.collect(Collectors.toList());
} finally {
commit.close();
}
}
/**
* Handles a entry set commit.
*
* @param commit
* entrySet commit
* @return set of map entries
*/
protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
Commit<? extends AtomixConsistentMapCommands.EntrySet> commit) {
try {
return mapEntries
.entrySet()
.stream()
.map(e -> Maps.immutableEntry(e.getKey(),
toVersioned(e.getValue())))
.collect(Collectors.toSet());
} finally {
commit.close();
}
}
/**
* Handles a update and get commit.
*
* @param commit
* updateAndGet commit
* @return update result
*/
protected MapEntryUpdateResult<String, byte[]> updateAndGet(
Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
MapEntryUpdateResult.Status updateStatus = validate(commit.operation());
String key = commit.operation().key();
MapEntryValue oldCommitValue = mapEntries.get(commit.operation().key());
Versioned<byte[]> oldMapValue = toVersioned(oldCommitValue);
if (updateStatus != MapEntryUpdateResult.Status.OK) {
commit.close();
return new MapEntryUpdateResult<>(updateStatus, "", key,
oldMapValue, oldMapValue);
}
byte[] newValue = commit.operation().value();
long newVersion = versionCounter.incrementAndGet();
Versioned<byte[]> newMapValue = newValue == null ? null
: new Versioned<>(newValue, newVersion);
MapEvent.Type updateType = newValue == null ? REMOVE
: oldCommitValue == null ? INSERT : UPDATE;
if (updateType == REMOVE || updateType == UPDATE) {
mapEntries.remove(key);
oldCommitValue.discard();
}
if (updateType == INSERT || updateType == UPDATE) {
mapEntries.put(key, new NonTransactionalCommit(newVersion, commit));
}
notify(new MapEvent<>("", key, newMapValue, oldMapValue));
return new MapEntryUpdateResult<>(updateStatus, "", key, oldMapValue,
newMapValue);
}
/**
* Handles a clear commit.
*
* @param commit
* clear commit
* @return clear result
*/
protected MapEntryUpdateResult.Status clear(
Commit<? extends AtomixConsistentMapCommands.Clear> commit) {
try {
Iterator<Map.Entry<String, MapEntryValue>> iterator = mapEntries
.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, MapEntryValue> entry = iterator.next();
String key = entry.getKey();
MapEntryValue value = entry.getValue();
Versioned<byte[]> removedValue = new Versioned<>(value.value(),
value.version());
notify(new MapEvent<>("", key, null, removedValue));
value.discard();
iterator.remove();
}
return MapEntryUpdateResult.Status.OK;
} finally {
commit.close();
}
}
/**
* Handles a listen commit.
*
* @param commit
* listen commit
*/
protected void listen(
Commit<? extends AtomixConsistentMapCommands.Listen> commit) {
Long sessionId = commit.session().id();
listeners.put(sessionId, commit);
commit.session()
.onStateChange(
state -> {
if (state == Session.State.CLOSED
|| state == Session.State.EXPIRED) {
Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
.remove(sessionId);
if (listener != null) {
listener.close();
}
}
});
}
/**
* Handles an unlisten commit.
*
* @param commit
* unlisten commit
*/
protected void unlisten(
Commit<? extends AtomixConsistentMapCommands.Unlisten> commit) {
try {
Commit<? extends AtomixConsistentMapCommands.Listen> listener = listeners
.remove(commit.session());
if (listener != null) {
listener.close();
}
} finally {
commit.close();
}
}
/**
* Triggers a change event.
*
* @param value
* map event
*/
private void notify(MapEvent<String, byte[]> value) {
listeners.values().forEach(
commit -> commit.session().publish("change", value));
}
/**
* Handles an prepare commit.
*
* @param commit
* transaction prepare commit
* @return prepare result
*/
protected PrepareResult prepare(
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
boolean ok = false;
try {
TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
.operation().transactionUpdate();
for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
String key = update.key();
if (preparedKeys.contains(key)) {
return PrepareResult.CONCURRENT_TRANSACTION;
}
MapEntryValue existingValue = mapEntries.get(key);
if (existingValue == null) {
if (update.currentValue() != null) {
return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
}
} else {
if (existingValue.version() != update.currentVersion()) {
return PrepareResult.OPTIMISTIC_LOCK_FAILURE;
}
}
}
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
pendingTransactions.put(transactionUpdate.transactionId(), commit);
transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
} finally {
if (!ok) {
commit.close();
}
}
}
/**
* Handles an commit commit (ha!).
*
* @param commit transaction commit commit
* @return commit result
*/
protected CommitResult commit(
Commit<? extends AtomixConsistentMapCommands.TransactionCommit> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
.remove(transactionId);
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
.operation().transactionUpdate();
long totalReferencesToCommit = transactionalUpdate
.batch()
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
String key = update.key();
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
checkState(preparedKeys.remove(key), "key is not prepared");
if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
newValue = new TransactionalCommit(key,
versionCounter.incrementAndGet(), completer);
}
mapEntries.put(key, newValue);
// Notify map listeners
notify(new MapEvent<>("", key, toVersioned(newValue),
toVersioned(previousValue)));
}
return CommitResult.OK;
} finally {
commit.close();
}
}
/**
* Handles an rollback commit (ha!).
*
* @param commit transaction rollback commit
* @return rollback result
*/
protected RollbackResult rollback(
Commit<? extends AtomixConsistentMapCommands.TransactionRollback> commit) {
TransactionId transactionId = commit.operation().transactionId();
try {
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> prepareCommit = pendingTransactions
.remove(transactionId);
if (prepareCommit == null) {
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
prepareCommit.operation().transactionUpdate().batch()
.forEach(u -> preparedKeys.remove(u.key()));
prepareCommit.close();
return RollbackResult.OK;
}
} finally {
commit.close();
}
}
private MapEntryUpdateResult.Status validate(
AtomixConsistentMapCommands.UpdateAndGet update) {
MapEntryValue existingValue = mapEntries.get(update.key());
if (existingValue == null && update.value() == null) {
return MapEntryUpdateResult.Status.NOOP;
}
if (preparedKeys.contains(update.key())) {
return MapEntryUpdateResult.Status.WRITE_LOCK;
}
byte[] existingRawValue = existingValue == null ? null : existingValue
.value();
Long existingVersion = existingValue == null ? null : existingValue
.version();
return update.valueMatch().matches(existingRawValue)
&& update.versionMatch().matches(existingVersion) ? MapEntryUpdateResult.Status.OK
: MapEntryUpdateResult.Status.PRECONDITION_FAILED;
}
private Versioned<byte[]> toVersioned(MapEntryValue value) {
return value == null ? null : new Versioned<>(value.value(),
value.version());
}
@Override
public void register(Session session) {
}
@Override
public void unregister(Session session) {
closeListener(session.id());
}
@Override
public void expire(Session session) {
closeListener(session.id());
}
@Override
public void close(Session session) {
closeListener(session.id());
}
private void closeListener(Long sessionId) {
Commit<? extends AtomixConsistentMapCommands.Listen> commit = listeners
.remove(sessionId);
if (commit != null) {
commit.close();
}
}
/**
* Interface implemented by map values.
*/
private interface MapEntryValue {
/**
* Returns the raw {@code byte[]}.
*
* @return raw value
*/
byte[] value();
/**
* Returns the version of the value.
*
* @return version
*/
long version();
/**
* Discards the value by invoke appropriate clean up actions.
*/
void discard();
}
/**
* A {@code MapEntryValue} that is derived from a non-transactional update
* i.e. via any standard map update operation.
*/
private class NonTransactionalCommit implements MapEntryValue {
private final long version;
private final Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit;
public NonTransactionalCommit(
long version,
Commit<? extends AtomixConsistentMapCommands.UpdateAndGet> commit) {
this.version = version;
this.commit = commit;
}
@Override
public byte[] value() {
return commit.operation().value();
}
@Override
public long version() {
return version;
}
@Override
public void discard() {
commit.close();
}
}
/**
* A {@code MapEntryValue} that is derived from updates submitted via a
* transaction.
*/
private class TransactionalCommit implements MapEntryValue {
private final String key;
private final long version;
private final CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer;
public TransactionalCommit(
String key,
long version,
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> commit) {
this.key = key;
this.version = version;
this.completer = commit;
}
@Override
public byte[] value() {
TransactionalMapUpdate<String, byte[]> update = completer.object()
.operation().transactionUpdate();
return update.valueForKey(key);
}
@Override
public long version() {
return version;
}
@Override
public void discard() {
completer.countDown();
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.variables.DistributedLong;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.service.AsyncAtomicCounter;
/**
* {@code AsyncAtomicCounter} implementation backed by Atomix
* {@link DistributedLong}.
*/
public class AtomixCounter implements AsyncAtomicCounter {
private final String name;
private final DistributedLong distLong;
public AtomixCounter(String name, DistributedLong distLong) {
this.name = name;
this.distLong = distLong;
}
@Override
public String name() {
return name;
}
@Override
public CompletableFuture<Long> incrementAndGet() {
return distLong.incrementAndGet();
}
@Override
public CompletableFuture<Long> getAndIncrement() {
return distLong.getAndIncrement();
}
@Override
public CompletableFuture<Long> getAndAdd(long delta) {
return distLong.getAndAdd(delta);
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
return distLong.addAndGet(delta);
}
@Override
public CompletableFuture<Long> get() {
return distLong.get();
}
@Override
public CompletableFuture<Void> set(long value) {
return distLong.set(value);
}
@Override
public CompletableFuture<Boolean> compareAndSet(long expectedValue,
long updateValue) {
return distLong.compareAndSet(expectedValue, updateValue);
}
}
\ No newline at end of file
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Consistency;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.service.AsyncLeaderElector;
import com.google.common.collect.Sets;
/**
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
public class AtomixLeaderElector
extends Resource<AtomixLeaderElector, Resource.Options> implements AsyncLeaderElector {
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
private Listener<Change<Leadership>> listener;
public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
super(client, options);
}
@Override
public String name() {
return null;
}
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
client.session().onEvent("change", this::handleEvent);
return result;
});
}
private void handleEvent(Change<Leadership> change) {
leadershipChangeListeners.forEach(l -> l.accept(change));
}
@Override
public AtomixLeaderElector with(Consistency consistency) {
super.with(consistency);
return this;
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
return submit(new AtomixLeaderElectorCommands.Run(topic, nodeId));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
return submit(new AtomixLeaderElectorCommands.Withdraw(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
return submit(new AtomixLeaderElectorCommands.Anoint(topic, nodeId));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
return submit(new AtomixLeaderElectorCommands.GetAllLeaderships());
}
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
return submit(new AtomixLeaderElectorCommands.GetElectedTopics(nodeId));
}
/**
* Leadership change listener context.
*/
private final class LeadershipChangeListener implements Listener<Change<Leadership>> {
private final Consumer<Change<Leadership>> listener;
private LeadershipChangeListener(Consumer<Change<Leadership>> listener) {
this.listener = listener;
}
@Override
public void accept(Change<Leadership> change) {
listener.accept(change);
}
@Override
public void close() {
synchronized (AtomixLeaderElector.this) {
submit(new AtomixLeaderElectorCommands.Unlisten());
}
}
}
@Override
public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
leadershipChangeListeners.add(consumer);
return setupListener();
}
@Override
public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
leadershipChangeListeners.remove(consumer);
return teardownListener();
}
private CompletableFuture<Void> setupListener() {
if (listener == null && !leadershipChangeListeners.isEmpty()) {
Consumer<Change<Leadership>> changeConsumer = change -> {
leadershipChangeListeners.forEach(consumer -> consumer.accept(change));
};
return submit(new AtomixLeaderElectorCommands.Listen())
.thenAccept(v -> listener = new LeadershipChangeListener(changeConsumer));
}
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> teardownListener() {
if (listener != null && leadershipChangeListeners.isEmpty()) {
listener.close();
listener = null;
return submit(new AtomixLeaderElectorCommands.Unlisten());
}
return CompletableFuture.completedFuture(null);
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.Map;
import java.util.Set;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
/**
* {@link AtomixLeaderElector} resource state machine operations.
*/
public final class AtomixLeaderElectorCommands {
private AtomixLeaderElectorCommands() {
}
/**
* Abstract election query.
*/
@SuppressWarnings("serial")
public abstract static class ElectionQuery<V> implements Query<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.BOUNDED_LINEARIZABLE;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* Abstract election topic query.
*/
@SuppressWarnings("serial")
public abstract static class TopicQuery<V> extends ElectionQuery<V> implements CatalystSerializable {
String topic;
public TopicQuery() {
}
public TopicQuery(String topic) {
this.topic = Assert.notNull(topic, "topic");
}
/**
* Returns the topic.
* @return topic
*/
public String topic() {
return topic;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
serializer.writeObject(topic, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
topic = serializer.readObject(buffer);
}
}
/**
* Abstract election command.
*/
@SuppressWarnings("serial")
public abstract static class ElectionCommand<V> implements Command<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.LINEARIZABLE;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
}
/**
* Listen command.
*/
@SuppressWarnings("serial")
public static class Listen extends ElectionCommand<Void> {
}
/**
* Unlisten command.
*/
@SuppressWarnings("serial")
public static class Unlisten extends ElectionCommand<Void> {
@Override
public CompactionMode compaction() {
return CompactionMode.QUORUM;
}
}
/**
* GetLeader query.
*/
@SuppressWarnings("serial")
public static class GetLeadership extends TopicQuery<Leadership> {
public GetLeadership() {
}
public GetLeadership(String topic) {
super(topic);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("topic", topic)
.toString();
}
}
/**
* GetAllLeaders query.
*/
@SuppressWarnings("serial")
public static class GetAllLeaderships extends ElectionQuery<Map<String, Leadership>> {
}
/**
* GetElectedTopics query.
*/
@SuppressWarnings("serial")
public static class GetElectedTopics extends ElectionQuery<Set<String>> {
private NodeId nodeId;
public GetElectedTopics() {
}
public GetElectedTopics(NodeId nodeId) {
this.nodeId = Assert.argNot(nodeId, nodeId == null, "nodeId cannot be null");
}
/**
* Returns the nodeId to check.
*
* @return The nodeId to check.
*/
public NodeId nodeId() {
return nodeId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("nodeId", nodeId)
.toString();
}
}
/**
* Enter and run for leadership.
*/
@SuppressWarnings("serial")
public static class Run extends ElectionCommand<Leadership> {
private String topic;
private NodeId nodeId;
public Run() {
}
public Run(String topic, NodeId nodeId) {
this.topic = Assert.argNot(topic, Strings.isNullOrEmpty(topic), "topic cannot be null or empty");
this.nodeId = Assert.argNot(nodeId, nodeId == null, "nodeId cannot be null");
}
/**
* Returns the topic.
*
* @return topic
*/
public String topic() {
return topic;
}
/**
* Returns the nodeId.
*
* @return the nodeId
*/
public NodeId nodeId() {
return nodeId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("topic", topic)
.add("nodeId", nodeId)
.toString();
}
}
/**
* Withdraw from a leadership contest.
*/
@SuppressWarnings("serial")
public static class Withdraw extends ElectionCommand<Void> {
private String topic;
public Withdraw() {
}
public Withdraw(String topic) {
this.topic = Assert.argNot(topic, Strings.isNullOrEmpty(topic), "topic cannot be null or empty");
}
/**
* Returns the topic.
*
* @return The topic
*/
public String topic() {
return topic;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("topic", topic)
.toString();
}
}
/**
* Command for administratively anointing a node as leader.
*/
@SuppressWarnings("serial")
public static class Anoint extends ElectionCommand<Boolean> {
private String topic;
private NodeId nodeId;
public Anoint() {
}
public Anoint(String topic, NodeId nodeId) {
this.topic = topic;
this.nodeId = nodeId;
}
/**
* Returns the topic.
*
* @return The topic
*/
public String topic() {
return topic;
}
/**
* Returns the nodeId to make leader.
*
* @return The nodeId
*/
public NodeId nodeId() {
return nodeId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("topic", topic)
.add("nodeId", nodeId)
.toString();
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.client.session.Session;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* State machine for {@link AtomixLeaderElector} resource.
*/
public class AtomixLeaderElectorState extends ResourceStateMachine
implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
private Map<String, AtomicLong> termCounters = new HashMap<>();
private Map<String, ElectionState> elections = new HashMap<>();
private final Map<Long, Commit<? extends AtomixLeaderElectorCommands.Listen>> listeners = new LinkedHashMap<>();
private final Serializer serializer = Serializer.using(Arrays.asList(KryoNamespaces.API),
ElectionState.class,
Registration.class);
@Override
protected void configure(StateMachineExecutor executor) {
// Notification
executor.register(AtomixLeaderElectorCommands.Listen.class, this::listen);
executor.register(AtomixLeaderElectorCommands.Unlisten.class, this::unlisten);
// Commands
executor.register(AtomixLeaderElectorCommands.Run.class, this::run);
executor.register(AtomixLeaderElectorCommands.Withdraw.class, this::withdraw);
executor.register(AtomixLeaderElectorCommands.Anoint.class, this::anoint);
// Queries
executor.register(AtomixLeaderElectorCommands.GetLeadership.class, this::leadership);
executor.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, this::allLeaderships);
executor.register(AtomixLeaderElectorCommands.GetElectedTopics.class, this::electedTopics);
}
private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
listeners.values().forEach(listener -> listener.session().publish("change", change));
}
@Override
public void delete() {
// Close and clear Listeners
listeners.values().forEach(Commit::close);
listeners.clear();
}
/**
* Applies listen commits.
*
* @param commit listen commit
*/
public void listen(Commit<? extends AtomixLeaderElectorCommands.Listen> commit) {
if (listeners.putIfAbsent(commit.session().id(), commit) != null) {
commit.close();
}
}
/**
* Applies unlisten commits.
*
* @param commit unlisten commit
*/
public void unlisten(Commit<? extends AtomixLeaderElectorCommands.Unlisten> commit) {
try {
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(commit.session().id());
if (listener != null) {
listener.close();
}
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.Run} commit.
* @param commit commit entry
* @return topic leader. If no previous leader existed this is the node that just entered the race.
*/
public Leadership run(Commit<? extends AtomixLeaderElectorCommands.Run> commit) {
try {
String topic = commit.operation().topic();
Leadership oldLeadership = leadership(topic);
Registration registration = new Registration(commit.operation().nodeId(), commit.session().id());
elections.compute(topic, (k, v) -> {
if (v == null) {
return new ElectionState(registration, termCounter(topic)::incrementAndGet);
} else {
if (!v.isDuplicate(registration)) {
return new ElectionState(v).addRegistration(registration, termCounter(topic)::incrementAndGet);
} else {
return v;
}
}
});
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
return newLeadership;
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.Withdraw} commit.
* @param commit withdraw commit
*/
public void withdraw(Commit<? extends AtomixLeaderElectorCommands.Withdraw> commit) {
try {
String topic = commit.operation().topic();
Leadership oldLeadership = leadership(topic);
elections.computeIfPresent(topic, (k, v) -> v.cleanup(commit.session(),
termCounter(topic)::incrementAndGet));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.Anoint} commit.
* @param commit anoint commit
* @return {@code true} if changes were made and the transfer occurred; {@code false} if it did not.
*/
public boolean anoint(Commit<? extends AtomixLeaderElectorCommands.Anoint> commit) {
try {
String topic = commit.operation().topic();
Leadership oldLeadership = leadership(topic);
ElectionState electionState = elections.computeIfPresent(topic,
(k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic)));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
return (electionState != null &&
electionState.leader() != null &&
commit.operation().nodeId().equals(electionState.leader().nodeId()));
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
* @param commit GetLeadership commit
* @return leader
*/
public Leadership leadership(Commit<? extends AtomixLeaderElectorCommands.GetLeadership> commit) {
String topic = commit.operation().topic();
try {
return leadership(topic);
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.GetElectedTopics} commit.
* @param commit commit entry
* @return set of topics for which the node is the leader
*/
public Set<String> electedTopics(Commit<? extends AtomixLeaderElectorCommands.GetElectedTopics> commit) {
try {
NodeId nodeId = commit.operation().nodeId();
return Maps.filterEntries(elections, e -> {
Leader leader = leadership(e.getKey()).leader();
return leader != null && leader.nodeId().equals(nodeId);
}).keySet();
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.GetAllLeaderships} commit.
* @param commit GetAllLeaderships commit
* @return topic to leader mapping
*/
public Map<String, Leadership> allLeaderships(
Commit<? extends AtomixLeaderElectorCommands.GetAllLeaderships> commit) {
try {
return Maps.transformEntries(elections, (k, v) -> leadership(k));
} finally {
commit.close();
}
}
private Leadership leadership(String topic) {
return new Leadership(topic,
leader(topic),
candidates(topic));
}
private Leader leader(String topic) {
ElectionState electionState = elections.get(topic);
return electionState == null ? null : electionState.leader();
}
private List<NodeId> candidates(String topic) {
ElectionState electionState = elections.get(topic);
return electionState == null ? new LinkedList<>() : electionState.candidates();
}
private void onSessionEnd(Session session) {
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
if (listener != null) {
listener.close();
}
Set<String> topics = elections.keySet();
topics.forEach(topic -> {
Leadership oldLeadership = leadership(topic);
elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
});
}
private static class Registration {
private final NodeId nodeId;
private final long sessionId;
public Registration(NodeId nodeId, long sessionId) {
this.nodeId = nodeId;
this.sessionId = sessionId;
}
public NodeId nodeId() {
return nodeId;
}
public long sessionId() {
return sessionId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("nodeId", nodeId)
.add("sessionId", sessionId)
.toString();
}
}
private static class ElectionState {
final Registration leader;
final long term;
final long termStartTime;
final List<Registration> registrations;
public ElectionState(Registration registration, Supplier<Long> termCounter) {
registrations = Arrays.asList(registration);
term = termCounter.get();
termStartTime = System.currentTimeMillis();
leader = registration;
}
public ElectionState(ElectionState other) {
registrations = Lists.newArrayList(other.registrations);
leader = other.leader;
term = other.term;
termStartTime = other.termStartTime;
}
public ElectionState(List<Registration> registrations,
Registration leader,
long term,
long termStartTime) {
this.registrations = Lists.newArrayList(registrations);
this.leader = leader;
this.term = term;
this.termStartTime = termStartTime;
}
public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
if (registration.isPresent()) {
List<Registration> updatedRegistrations =
registrations.stream()
.filter(r -> r.sessionId() != session.id())
.collect(Collectors.toList());
if (leader.sessionId() == session.id()) {
if (updatedRegistrations.size() > 0) {
return new ElectionState(updatedRegistrations,
updatedRegistrations.get(0),
termCounter.get(),
System.currentTimeMillis());
} else {
return new ElectionState(updatedRegistrations, null, term, termStartTime);
}
} else {
return new ElectionState(updatedRegistrations, leader, term, termStartTime);
}
} else {
return this;
}
}
public boolean isDuplicate(Registration registration) {
return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
}
public Leader leader() {
if (leader == null) {
return null;
} else {
NodeId leaderNodeId = leader.nodeId();
return new Leader(leaderNodeId, term, termStartTime);
}
}
public List<NodeId> candidates() {
return registrations.stream().map(registration -> registration.nodeId()).collect(Collectors.toList());
}
public ElectionState addRegistration(Registration registration, Supplier<Long> termCounter) {
if (!registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId())) {
List<Registration> updatedRegistrations = new LinkedList<>(registrations);
updatedRegistrations.add(registration);
boolean newLeader = leader == null;
return new ElectionState(updatedRegistrations,
newLeader ? registration : leader,
newLeader ? termCounter.get() : term,
newLeader ? System.currentTimeMillis() : termStartTime);
}
return this;
}
public ElectionState transferLeadership(NodeId nodeId, AtomicLong termCounter) {
Registration newLeader = registrations.stream()
.filter(r -> r.nodeId().equals(nodeId))
.findFirst()
.orElse(null);
if (newLeader != null) {
return new ElectionState(registrations,
newLeader,
termCounter.incrementAndGet(),
System.currentTimeMillis());
} else {
return this;
}
}
}
@Override
public void register(Session session) {
}
@Override
public void unregister(Session session) {
onSessionEnd(session);
}
@Override
public void expire(Session session) {
onSessionEnd(session);
}
@Override
public void close(Session session) {
onSessionEnd(session);
}
@Override
public void snapshot(SnapshotWriter writer) {
byte[] encodedTermCounters = serializer.encode(termCounters);
writer.writeInt(encodedTermCounters.length);
writer.write(encodedTermCounters);
byte[] encodedElections = serializer.encode(elections);
writer.writeInt(encodedElections.length);
writer.write(encodedElections);
log.info("Took state machine snapshot");
}
@Override
public void install(SnapshotReader reader) {
int encodedTermCountersSize = reader.readInt();
byte[] encodedTermCounters = new byte[encodedTermCountersSize];
reader.read(encodedTermCounters);
termCounters = serializer.decode(encodedTermCounters);
int encodedElectionsSize = reader.readInt();
byte[] encodedElections = new byte[encodedElectionsSize];
reader.read(encodedElections);
elections = serializer.decode(encodedElections);
log.info("Reinstated state machine from snapshot");
}
private AtomicLong termCounter(String topic) {
return termCounters.computeIfAbsent(topic, k -> new AtomicLong(0));
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Sets;
/**
* Implementation of {@link AsyncAtomicValue} backed by {@link AtomixConsistentMap}.
*/
public class AtomixValue implements AsyncAtomicValue<String> {
private final String name;
private final AtomixConsistentMap atomixMap;
private MapEventListener<String, byte[]> mapEventListener;
private final Set<AtomicValueEventListener<String>> listeners = Sets.newIdentityHashSet();
AtomixValue(String name, AtomixConsistentMap atomixMap) {
this.name = name;
this.atomixMap = atomixMap;
}
@Override
public CompletableFuture<Boolean> compareAndSet(String expect, String update) {
return atomixMap.replace(name, Tools.getBytesUtf8(expect), Tools.getBytesUtf8(update));
}
@Override
public CompletableFuture<String> get() {
return atomixMap.get(name)
.thenApply(v -> v != null ? Tools.toStringUtf8(v.value()) : null);
}
@Override
public CompletableFuture<String> getAndSet(String value) {
return atomixMap.put(name, Tools.getBytesUtf8(value))
.thenApply(v -> v != null ? Tools.toStringUtf8(v.value()) : null);
}
@Override
public CompletableFuture<Void> set(String value) {
return getAndSet(value).thenApply(v -> null);
}
@Override
public CompletableFuture<Void> addListener(AtomicValueEventListener<String> listener) {
// TODO: synchronization
if (mapEventListener == null) {
mapEventListener = event -> {
Versioned<byte[]> newValue = event.newValue();
Versioned<byte[]> oldValue = event.oldValue();
if (Objects.equals(event.key(), name)) {
listener.event(new AtomicValueEvent<>(name,
newValue == null ? null : Tools.toStringUtf8(newValue.value()),
oldValue == null ? null : Tools.toStringUtf8(oldValue.value())));
}
};
return atomixMap.addListener(mapEventListener).whenComplete((r, e) -> {
if (e == null) {
listeners.add(listener);
} else {
mapEventListener = null;
}
});
} else {
listeners.add(listener);
return CompletableFuture.completedFuture(null);
}
}
@Override
public CompletableFuture<Void> removeListener(AtomicValueEventListener<String> listener) {
// TODO: synchronization
listeners.remove(listener);
if (listeners.isEmpty()) {
return atomixMap.removeListener(mapEventListener);
} else {
return CompletableFuture.completedFuture(null);
}
}
@Override
public String name() {
return null;
}
}
\ No newline at end of file
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
/**
* Response enum for two phase commit operation.
*/
public enum CommitResult {
/**
* Signifies a successful commit execution.
*/
OK,
/**
* Signifies a failure due to unrecognized transaction identifier.
*/
UNKNOWN_TRANSACTION_ID,
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.function.Function;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
/**
* Result of a map entry update operation.
* <p>
* Both old and new values are accessible along with a flag that indicates if the
* the value was updated. If flag is false, oldValue and newValue both
* point to the same unmodified value.
* @param <V> result type
*/
public class MapEntryUpdateResult<K, V> {
public enum Status {
/**
* Indicates a successful update.
*/
OK,
/**
* Indicates a noop i.e. existing and new value are both null.
*/
NOOP,
/**
* Indicates a failed update due to a write lock.
*/
WRITE_LOCK,
/**
* Indicates a failed update due to a precondition check failure.
*/
PRECONDITION_FAILED
}
private final String mapName;
private Status status;
private final K key;
private final Versioned<V> oldValue;
private final Versioned<V> newValue;
public MapEntryUpdateResult(Status status, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
this.status = status;
this.mapName = mapName;
this.key = key;
this.oldValue = oldValue;
this.newValue = newValue;
}
/**
* Returns {@code true} if the update was successful.
* @return {@code true} if yes, {@code false} otherwise
*/
public boolean updated() {
return status == Status.OK;
}
/**
* Returns the map name.
* @return map name
*/
public String mapName() {
return mapName;
}
/**
* Returns the update status.
* @return update status
*/
public Status status() {
return status;
}
/**
* Returns the map key.
* @return key
*/
public K key() {
return key;
}
/**
* Returns the old value.
* @return the previous value associated with key if updated was successful, otherwise current value
*/
public Versioned<V> oldValue() {
return oldValue;
}
/**
* Returns the new value after update.
* @return if updated was unsuccessful, this is same as old value
*/
public Versioned<V> newValue() {
return newValue;
}
/**
* Maps to another instance with different key and value types.
* @param keyTransform transformer to use for transcoding keys
* @param valueMapper mapper to use for transcoding values
* @return new instance
*/
public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
return new MapEntryUpdateResult<>(status,
mapName,
keyTransform.apply(key),
oldValue == null ? null : oldValue.map(valueMapper),
newValue == null ? null : newValue.map(valueMapper));
}
/**
* Return the map event that will be generated as a result of this update.
* @return map event. if update was unsuccessful, this returns {@code null}
*/
public MapEvent<K, V> toMapEvent() {
if (!updated()) {
return null;
} else {
return new MapEvent<>(mapName(), key(), newValue, oldValue);
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(MapEntryUpdateResult.class)
.add("mapName", mapName)
.add("status", status)
.add("key", key)
.add("oldValue", oldValue)
.add("newValue", newValue)
.toString();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
/**
* Map update operation.
*
* @param <K> map key type
* @param <V> map value type
*
*/
public final class MapUpdate<K, V> {
/**
* Type of database update operation.
*/
public enum Type {
/**
* Insert/Update entry without any checks.
*/
PUT,
/**
* Insert an entry iff there is no existing entry for that key.
*/
PUT_IF_ABSENT,
/**
* Update entry if the current version matches specified version.
*/
PUT_IF_VERSION_MATCH,
/**
* Update entry if the current value matches specified value.
*/
PUT_IF_VALUE_MATCH,
/**
* Remove entry without any checks.
*/
REMOVE,
/**
* Remove entry if the current version matches specified version.
*/
REMOVE_IF_VERSION_MATCH,
/**
* Remove entry if the current value matches specified value.
*/
REMOVE_IF_VALUE_MATCH,
}
private Type type;
private K key;
private V value;
private V currentValue;
private long currentVersion = -1;
/**
* Returns the type of update operation.
* @return type of update.
*/
public Type type() {
return type;
}
/**
* Returns the item key being updated.
* @return item key
*/
public K key() {
return key;
}
/**
* Returns the new value.
* @return item's target value.
*/
public V value() {
return value;
}
/**
* Returns the expected current value for the key.
* @return current value in database.
*/
public V currentValue() {
return currentValue;
}
/**
* Returns the expected current version in the database for the key.
* @return expected version.
*/
public long currentVersion() {
return currentVersion;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("key", key)
.add("value", value)
.add("currentValue", currentValue)
.add("currentVersion", currentVersion)
.toString();
}
/**
* Creates a new builder instance.
*
* @param <K> key type
* @param <V> value type
* @return builder.
*/
public static <K, V> Builder<K, V> newBuilder() {
return new Builder<>();
}
/**
* MapUpdate builder.
*
* @param <K> key type
* @param <V> value type
*/
public static final class Builder<K, V> {
private MapUpdate<K, V> update = new MapUpdate<>();
public MapUpdate<K, V> build() {
validateInputs();
return update;
}
public Builder<K, V> withType(Type type) {
update.type = checkNotNull(type, "type cannot be null");
return this;
}
public Builder<K, V> withKey(K key) {
update.key = checkNotNull(key, "key cannot be null");
return this;
}
public Builder<K, V> withCurrentValue(V value) {
update.currentValue = checkNotNull(value, "currentValue cannot be null");
return this;
}
public Builder<K, V> withValue(V value) {
update.value = checkNotNull(value, "value cannot be null");
return this;
}
public Builder<K, V> withCurrentVersion(long version) {
checkArgument(version >= 0, "version cannot be negative");
update.currentVersion = version;
return this;
}
private void validateInputs() {
checkNotNull(update.type, "type must be specified");
checkNotNull(update.key, "key must be specified");
switch (update.type) {
case PUT:
case PUT_IF_ABSENT:
checkNotNull(update.value, "value must be specified.");
break;
case PUT_IF_VERSION_MATCH:
checkNotNull(update.value, "value must be specified.");
checkState(update.currentVersion >= 0, "current version must be specified");
break;
case PUT_IF_VALUE_MATCH:
checkNotNull(update.value, "value must be specified.");
checkNotNull(update.currentValue, "currentValue must be specified.");
break;
case REMOVE:
break;
case REMOVE_IF_VERSION_MATCH:
checkState(update.currentVersion >= 0, "current version must be specified");
break;
case REMOVE_IF_VALUE_MATCH:
checkNotNull(update.currentValue, "currentValue must be specified.");
break;
default:
throw new IllegalStateException("Unknown operation type");
}
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
/**
* Response enum for two phase commit prepare operation.
*/
public enum PrepareResult {
/**
* Signifies a successful execution of the prepare operation.
*/
OK,
/**
* Signifies a failure to another transaction locking the underlying state.
*/
CONCURRENT_TRANSACTION,
/**
* Signifies a optimistic lock failure. This can happen if underlying state has changed since it was last read.
*/
OPTIMISTIC_LOCK_FAILURE,
}
\ No newline at end of file
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
/**
* Response enum for two phase commit rollback operation.
*/
public enum RollbackResult {
/**
* Signifies a successful rollback execution.
*/
OK,
/**
* Signifies a failure due to unrecognized transaction identifier.
*/
UNKNOWN_TRANSACTION_ID,
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import com.google.common.base.Objects;
/**
* Transaction identifier.
*/
public final class TransactionId {
public static TransactionId from(String id) {
return new TransactionId(id);
}
private final String id;
private TransactionId(String id) {
this.id = id;
}
@Override
public String toString() {
return id;
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other instanceof TransactionId) {
TransactionId that = (TransactionId) other;
return Objects.equal(this.id, that.id);
}
return false;
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.Collection;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
* A batch updates to an {@code AsyncConsistentMap} be committed as a transaction.
*
* @param <K> key type
* @param <V> value type
*/
public class TransactionalMapUpdate<K, V> {
private final TransactionId transactionId;
private final Collection<MapUpdate<K, V>> updates;
private boolean indexPopulated = false;
private final Map<K, V> keyValueIndex = Maps.newHashMap();
public TransactionalMapUpdate(TransactionId transactionId, Collection<MapUpdate<K, V>> updates) {
this.transactionId = transactionId;
this.updates = ImmutableList.copyOf(updates);
populateIndex();
}
/**
* Returns the transaction identifier.
* @return transaction id
*/
public TransactionId transactionId() {
return transactionId;
}
/**
* Returns the collection of map updates.
* @return map updates
*/
public Collection<MapUpdate<K, V>> batch() {
return updates;
}
/**
* Returns the value that will be associated with the key after this transaction commits.
* @param key key
* @return value that will be associated with the value once this transaction commits
*/
public V valueForKey(K key) {
if (!indexPopulated) {
// We do not synchronize as we don't expect this called to be made from multiple threads.
populateIndex();
}
return keyValueIndex.get(key);
}
/**
* Populates the internal key -> value mapping.
*/
private synchronized void populateIndex() {
updates.forEach(mapUpdate -> {
if (mapUpdate.value() != null) {
keyValueIndex.put(mapUpdate.key(), mapUpdate.value());
}
});
indexPopulated = true;
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* State machine implementation classes for distributed primitives.
*/
package org.onosproject.store.primitives.resources.impl;
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.resource.ResourceType;
import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Sets;
/**
* Unit tests for {@link AtomixConsistentMap}.
*/
public class AtomixConsistentMapTest extends AtomixTestBase {
@Override
protected ResourceType resourceType() {
return new ResourceType(AtomixConsistentMap.class);
}
/**
* Tests various basic map operations.
*/
@Test
public void testBasicMapOperations() throws Throwable {
basicMapOperationTests(1);
clearTests();
basicMapOperationTests(2);
clearTests();
basicMapOperationTests(3);
}
/**
* Tests various map compute* operations on different cluster sizes.
*/
@Test
public void testMapComputeOperations() throws Throwable {
mapComputeOperationTests(1);
clearTests();
mapComputeOperationTests(2);
clearTests();
mapComputeOperationTests(3);
}
/**
* Tests map event notifications.
*/
@Test
public void testMapListeners() throws Throwable {
mapListenerTests(1);
clearTests();
mapListenerTests(2);
clearTests();
mapListenerTests(3);
}
/**
* Tests map transaction commit.
*/
@Test
public void testTransactionCommit() throws Throwable {
transactionCommitTests(1);
clearTests();
transactionCommitTests(2);
clearTests();
transactionCommitTests(3);
}
/**
* Tests map transaction rollback.
*/
@Test
public void testTransactionRollback() throws Throwable {
transactionRollbackTests(1);
clearTests();
transactionRollbackTests(2);
clearTests();
transactionRollbackTests(3);
}
protected void basicMapOperationTests(int clusterSize) throws Throwable {
createCopycatServers(clusterSize);
final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
map.isEmpty().thenAccept(result -> {
assertTrue(result);
}).join();
map.put("foo", rawFooValue).thenAccept(result -> {
assertNull(result);
}).join();
map.size().thenAccept(result -> {
assertTrue(result == 1);
}).join();
map.isEmpty().thenAccept(result -> {
assertFalse(result);
}).join();
map.putIfAbsent("foo", "Hello foo again!".getBytes()).thenAccept(result -> {
assertNotNull(result);
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
map.putIfAbsent("bar", rawBarValue).thenAccept(result -> {
assertNull(result);
}).join();
map.size().thenAccept(result -> {
assertTrue(result == 2);
}).join();
map.keySet().thenAccept(result -> {
assertTrue(result.size() == 2);
assertTrue(result.containsAll(Sets.newHashSet("foo", "bar")));
}).join();
map.values().thenAccept(result -> {
assertTrue(result.size() == 2);
List<String> rawValues =
result.stream().map(v -> Tools.toStringUtf8(v.value())).collect(Collectors.toList());
assertTrue(rawValues.contains("Hello foo!"));
assertTrue(rawValues.contains("Hello bar!"));
}).join();
map.entrySet().thenAccept(result -> {
assertTrue(result.size() == 2);
// TODO: check entries
}).join();
map.get("foo").thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
map.remove("foo").thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawFooValue));
}).join();
map.containsKey("foo").thenAccept(result -> {
assertFalse(result);
}).join();
map.get("foo").thenAccept(result -> {
assertNull(result);
}).join();
map.get("bar").thenAccept(result -> {
assertNotNull(result);
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
}).join();
map.containsKey("bar").thenAccept(result -> {
assertTrue(result);
}).join();
map.size().thenAccept(result -> {
assertTrue(result == 1);
}).join();
map.containsValue(rawBarValue).thenAccept(result -> {
assertTrue(result);
}).join();
map.containsValue(rawFooValue).thenAccept(result -> {
assertFalse(result);
}).join();
map.replace("bar", "Goodbye bar!".getBytes()).thenAccept(result -> {
assertNotNull(result);
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), rawBarValue));
}).join();
map.replace("foo", "Goodbye foo!".getBytes()).thenAccept(result -> {
assertNull(result);
}).join();
// try replace_if_value_match for a non-existent key
map.replace("foo", "Goodbye foo!".getBytes(), rawFooValue).thenAccept(result -> {
assertFalse(result);
}).join();
map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
assertTrue(result);
}).join();
map.replace("bar", "Goodbye bar!".getBytes(), rawBarValue).thenAccept(result -> {
assertFalse(result);
}).join();
Versioned<byte[]> barValue = map.get("bar").join();
map.replace("bar", barValue.version(), "Goodbye bar!".getBytes()).thenAccept(result -> {
assertTrue(result);
}).join();
map.replace("bar", barValue.version(), rawBarValue).thenAccept(result -> {
assertFalse(result);
}).join();
map.clear().join();
map.size().thenAccept(result -> {
assertTrue(result == 0);
}).join();
}
public void mapComputeOperationTests(int clusterSize) throws Throwable {
createCopycatServers(clusterSize);
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
map.computeIfAbsent("foo", k -> value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
map.computeIfPresent("bar", (k, v) -> value2).thenAccept(result -> {
assertNull(result);
});
map.computeIfPresent("foo", (k, v) -> value3).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value3));
}).join();
map.computeIfPresent("foo", (k, v) -> null).thenAccept(result -> {
assertNull(result);
}).join();
map.computeIf("foo", v -> v == null, (k, v) -> value1).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
map.compute("foo", (k, v) -> value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value2));
}).join();
}
protected void mapListenerTests(int clusterSize) throws Throwable {
createCopycatServers(clusterSize);
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
map.addListener(listener).join();
map.put("foo", value1).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
listener.clearEvent();
// remove listener and verify listener is not notified.
map.removeListener(listener).join();
map.put("foo", value2).join();
assertNull(listener.event());
// add the listener back and verify UPDATE events are received correctly
map.addListener(listener).join();
map.put("foo", value3).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.UPDATE, listener.event().type());
assertTrue(Arrays.equals(value3, listener.event().newValue().value()));
listener.clearEvent();
// perform a non-state changing operation and verify no events are received.
map.putIfAbsent("foo", value1).join();
assertNull(listener.event());
// verify REMOVE events are received correctly.
map.remove("foo").join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.REMOVE, listener.event().type());
assertTrue(Arrays.equals(value3, listener.event().oldValue().value()));
listener.clearEvent();
// verify compute methods also generate events.
map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
listener.clearEvent();
map.compute("foo", (k, v) -> value2).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.UPDATE, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
listener.clearEvent();
map.computeIf("foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.REMOVE, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().oldValue().value()));
listener.clearEvent();
map.removeListener(listener).join();
}
protected void transactionCommitTests(int clusterSize) throws Throwable {
createCopycatServers(clusterSize);
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
MapUpdate<String, byte[]> update1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey("foo")
.withValue(value1)
.build();
TransactionalMapUpdate<String, byte[]> txMapUpdate =
new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(txMapUpdate).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
}).join();
assertNull(listener.event());
map.size().thenAccept(result -> {
assertTrue(result == 0);
}).join();
map.get("foo").thenAccept(result -> {
assertNull(result);
}).join();
try {
map.put("foo", value2).join();
assertTrue(false);
} catch (CompletionException e) {
assertEquals(ConcurrentModificationException.class, e.getCause().getClass());
}
assertNull(listener.event());
map.commit(txMapUpdate.transactionId()).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
listener.clearEvent();
map.put("foo", value2).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
}).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.UPDATE, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
listener.clearEvent();
}
protected void transactionRollbackTests(int clusterSize) throws Throwable {
createCopycatServers(clusterSize);
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
MapUpdate<String, byte[]> update1 =
MapUpdate.<String, byte[]>newBuilder().withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey("foo")
.withValue(value1)
.build();
TransactionalMapUpdate<String, byte[]> txMapUpdate =
new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(txMapUpdate).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
}).join();
assertNull(listener.event());
map.rollback(txMapUpdate.transactionId()).join();
assertNull(listener.event());
map.get("foo").thenAccept(result -> {
assertNull(result);
}).join();
map.put("foo", value2).thenAccept(result -> {
assertNull(result);
}).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value2, listener.event().newValue().value()));
listener.clearEvent();
}
private static class TestMapEventListener implements MapEventListener<String, byte[]> {
MapEvent<String, byte[]> event;
@Override
public void event(MapEvent<String, byte[]> event) {
this.event = event;
}
public MapEvent<String, byte[]> event() {
return event;
}
public void clearEvent() {
event = null;
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.Test;
import static org.junit.Assert.*;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import io.atomix.Atomix;
import io.atomix.resource.ResourceType;
/**
* Unit tests for {@link AtomixLeaderElector}.
*/
public class AtomixLeaderElectorTest extends AtomixTestBase {
NodeId node1 = new NodeId("node1");
NodeId node2 = new NodeId("node2");
NodeId node3 = new NodeId("node3");
@Override
protected ResourceType resourceType() {
return new ResourceType(AtomixLeaderElector.class);
}
@Test
public void testRun() throws Throwable {
leaderElectorRunTests(1);
clearTests();
// leaderElectorRunTests(2);
// clearTests();
// leaderElectorRunTests(3);
// clearTests();
}
private void leaderElectorRunTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
assertEquals(1, result.candidates().size());
assertEquals(node1, result.candidates().get(0));
}).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
elector2.run("foo", node2).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
assertEquals(2, result.candidates().size());
assertEquals(node1, result.candidates().get(0));
assertEquals(node2, result.candidates().get(1));
}).join();
}
@Test
public void testWithdraw() throws Throwable {
leaderElectorWithdrawTests(1);
clearTests();
leaderElectorWithdrawTests(2);
clearTests();
leaderElectorWithdrawTests(3);
clearTests();
}
private void leaderElectorWithdrawTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
elector2.run("foo", node2).join();
LeaderEventListener listener1 = new LeaderEventListener();
elector1.addChangeListener(listener1).join();
LeaderEventListener listener2 = new LeaderEventListener();
elector2.addChangeListener(listener2).join();
elector1.withdraw("foo").join();
listener1.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(2, result.newValue().leader().term());
assertEquals(1, result.newValue().candidates().size());
assertEquals(node2, result.newValue().candidates().get(0));
}).join();
listener2.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(2, result.newValue().leader().term());
assertEquals(1, result.newValue().candidates().size());
assertEquals(node2, result.newValue().candidates().get(0));
}).join();
}
@Test
public void testAnoint() throws Throwable {
leaderElectorAnointTests(1);
clearTests();
leaderElectorAnointTests(2);
clearTests();
leaderElectorAnointTests(3);
clearTests();
}
private void leaderElectorAnointTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
Atomix client3 = createAtomixClient();
AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
LeaderEventListener listener1 = new LeaderEventListener();
elector1.addChangeListener(listener1).join();
LeaderEventListener listener2 = new LeaderEventListener();
elector2.addChangeListener(listener2);
LeaderEventListener listener3 = new LeaderEventListener();
elector3.addChangeListener(listener3).join();
elector3.anoint("foo", node3).thenAccept(result -> {
assertFalse(result);
}).join();
assertFalse(listener1.hasEvent());
assertFalse(listener2.hasEvent());
assertFalse(listener3.hasEvent());
elector3.anoint("foo", node2).thenAccept(result -> {
assertTrue(result);
}).join();
assertTrue(listener1.hasEvent());
assertTrue(listener2.hasEvent());
assertTrue(listener3.hasEvent());
listener1.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(2, result.newValue().candidates().size());
assertEquals(node1, result.newValue().candidates().get(0));
assertEquals(node2, result.newValue().candidates().get(1));
}).join();
listener2.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(2, result.newValue().candidates().size());
assertEquals(node1, result.newValue().candidates().get(0));
assertEquals(node2, result.newValue().candidates().get(1));
}).join();
listener3.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(2, result.newValue().candidates().size());
assertEquals(node1, result.newValue().candidates().get(0));
assertEquals(node2, result.newValue().candidates().get(1));
}).join();
}
@Test
public void testLeaderSessionClose() throws Throwable {
leaderElectorLeaderSessionCloseTests(1);
clearTests();
leaderElectorLeaderSessionCloseTests(2);
clearTests();
leaderElectorLeaderSessionCloseTests(3);
clearTests();
}
private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector2.addChangeListener(listener).join();
client1.close();
listener.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
assertEquals(1, result.newValue().candidates().size());
assertEquals(node2, result.newValue().candidates().get(0));
}).join();
}
@Test
public void testNonLeaderSessionClose() throws Throwable {
leaderElectorNonLeaderSessionCloseTests(1);
clearTests();
leaderElectorNonLeaderSessionCloseTests(2);
clearTests();
leaderElectorNonLeaderSessionCloseTests(3);
clearTests();
}
private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector1.addChangeListener(listener).join();
client2.close().join();
listener.nextEvent().thenAccept(result -> {
assertEquals(node1, result.newValue().leaderNodeId());
assertEquals(1, result.newValue().candidates().size());
assertEquals(node1, result.newValue().candidates().get(0));
}).join();
}
@Test
public void testQueries() throws Throwable {
leaderElectorQueryTests(1);
clearTests();
leaderElectorQueryTests(2);
clearTests();
leaderElectorQueryTests(3);
clearTests();
}
private void leaderElectorQueryTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
elector2.run("bar", node2).join();
elector1.getElectedTopics(node1).thenAccept(result -> {
assertEquals(1, result.size());
assertTrue(result.contains("foo"));
}).join();
elector2.getElectedTopics(node1).thenAccept(result -> {
assertEquals(1, result.size());
assertTrue(result.contains("foo"));
}).join();
elector1.getLeadership("foo").thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(node1, result.candidates().get(0));
assertEquals(node2, result.candidates().get(1));
}).join();
elector2.getLeadership("foo").thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(node1, result.candidates().get(0));
assertEquals(node2, result.candidates().get(1));
}).join();
elector1.getLeadership("bar").thenAccept(result -> {
assertEquals(node2, result.leaderNodeId());
assertEquals(node2, result.candidates().get(0));
}).join();
elector2.getLeadership("bar").thenAccept(result -> {
assertEquals(node2, result.leaderNodeId());
assertEquals(node2, result.candidates().get(0));
}).join();
elector1.getLeaderships().thenAccept(result -> {
assertEquals(2, result.size());
Leadership fooLeadership = result.get("foo");
assertEquals(node1, fooLeadership.leaderNodeId());
assertEquals(node1, fooLeadership.candidates().get(0));
assertEquals(node2, fooLeadership.candidates().get(1));
Leadership barLeadership = result.get("bar");
assertEquals(node2, barLeadership.leaderNodeId());
assertEquals(node2, barLeadership.candidates().get(0));
}).join();
elector2.getLeaderships().thenAccept(result -> {
assertEquals(2, result.size());
Leadership fooLeadership = result.get("foo");
assertEquals(node1, fooLeadership.leaderNodeId());
assertEquals(node1, fooLeadership.candidates().get(0));
assertEquals(node2, fooLeadership.candidates().get(1));
Leadership barLeadership = result.get("bar");
assertEquals(node2, barLeadership.leaderNodeId());
assertEquals(node2, barLeadership.candidates().get(0));
}).join();
}
private static class LeaderEventListener implements Consumer<Change<Leadership>> {
Queue<Change<Leadership>> eventQueue = new LinkedList<>();
CompletableFuture<Change<Leadership>> pendingFuture;
@Override
public void accept(Change<Leadership> change) {
synchronized (this) {
if (pendingFuture != null) {
pendingFuture.complete(change);
pendingFuture = null;
} else {
eventQueue.add(change);
}
}
}
public boolean hasEvent() {
return !eventQueue.isEmpty();
}
public CompletableFuture<Change<Leadership>> nextEvent() {
synchronized (this) {
if (eventQueue.isEmpty()) {
if (pendingFuture == null) {
pendingFuture = new CompletableFuture<>();
}
return pendingFuture;
} else {
return CompletableFuture.completedFuture(eventQueue.poll());
}
}
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.junit.Assert.*;
import org.junit.Test;
import io.atomix.Atomix;
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
/**
* Unit tests for {@link AtomixCounter}.
*/
public class AtomixLongTest extends AtomixTestBase {
@Override
protected ResourceType resourceType() {
return new ResourceType(DistributedLong.class);
}
@Test
public void testBasicOperations() throws Throwable {
basicOperationsTest(1);
clearTests();
basicOperationsTest(2);
clearTests();
basicOperationsTest(3);
clearTests();
}
protected void basicOperationsTest(int clusterSize) throws Throwable {
createCopycatServers(clusterSize);
Atomix atomix = createAtomixClient();
AtomixCounter along = new AtomixCounter("test-long", atomix.getLong("test-long").join());
assertEquals(0, along.get().join().longValue());
assertEquals(1, along.incrementAndGet().join().longValue());
along.set(100).join();
assertEquals(100, along.get().join().longValue());
assertEquals(100, along.getAndAdd(10).join().longValue());
assertEquals(110, along.get().join().longValue());
assertFalse(along.compareAndSet(109, 111).join());
assertTrue(along.compareAndSet(110, 111).join());
assertEquals(100, along.addAndGet(-11).join().longValue());
assertEquals(100, along.getAndIncrement().join().longValue());
assertEquals(101, along.get().join().longValue());
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.LocalServerRegistry;
import io.atomix.catalyst.transport.LocalTransport;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.onosproject.store.primitives.impl.CatalystSerializers;
import com.google.common.util.concurrent.Uninterruptibles;
/**
* Base class for various Atomix* tests.
*/
public abstract class AtomixTestBase {
private static final File TEST_DIR = new File("target/test-logs");
protected LocalServerRegistry registry;
protected int port;
protected List<Address> members;
protected List<CopycatClient> copycatClients = new ArrayList<>();
protected List<CopycatServer> copycatServers = new ArrayList<>();
protected List<Atomix> atomixClients = new ArrayList<>();
protected List<CopycatServer> atomixServers = new ArrayList<>();
protected Serializer serializer = CatalystSerializers.getSerializer();
/**
* Creates a new resource state machine.
*
* @return A new resource state machine.
*/
protected abstract ResourceType resourceType();
/**
* Returns the next server address.
*
* @return The next server address.
*/
private Address nextAddress() {
Address address = new Address("localhost", port++);
members.add(address);
return address;
}
/**
* Creates a set of Copycat servers.
*/
protected List<CopycatServer> createCopycatServers(int nodes) throws Throwable {
CountDownLatch latch = new CountDownLatch(nodes);
List<CopycatServer> servers = new ArrayList<>();
List<Address> members = new ArrayList<>();
for (int i = 0; i < nodes; i++) {
members.add(nextAddress());
}
for (int i = 0; i < nodes; i++) {
CopycatServer server = createCopycatServer(members.get(i));
server.open().thenRun(latch::countDown);
servers.add(server);
}
Uninterruptibles.awaitUninterruptibly(latch);
return servers;
}
/**
* Creates a Copycat server.
*/
protected CopycatServer createCopycatServer(Address address) {
ResourceRegistry resourceRegistry = new ResourceRegistry();
resourceRegistry.register(resourceType());
CopycatServer server = CopycatServer.builder(address, members)
.withTransport(new LocalTransport(registry))
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(TEST_DIR + "/" + address.port())
.withSerializer(serializer.clone())
.build())
.withStateMachine(() -> new ResourceManagerState(resourceRegistry))
.withSerializer(serializer.clone())
.withHeartbeatInterval(Duration.ofMillis(25))
.withElectionTimeout(Duration.ofMillis(50))
.withSessionTimeout(Duration.ofMillis(100))
.build();
copycatServers.add(server);
return server;
}
@Before
@After
public void clearTests() throws Exception {
registry = new LocalServerRegistry();
members = new ArrayList<>();
port = 5000;
CompletableFuture<Void> closeClients =
CompletableFuture.allOf(atomixClients.stream()
.map(Atomix::close)
.toArray(CompletableFuture[]::new));
closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
.map(CopycatServer::close)
.toArray(CompletableFuture[]::new))).join();
deleteDirectory(TEST_DIR);
atomixClients = new ArrayList<>();
copycatServers = new ArrayList<>();
}
/**
* Deletes a directory recursively.
*/
private void deleteDirectory(File directory) throws IOException {
if (directory.exists()) {
File[] files = directory.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
deleteDirectory(file);
} else {
Files.delete(file.toPath());
}
}
}
Files.delete(directory.toPath());
}
}
/**
* Creates a Atomix client.
*/
protected Atomix createAtomixClient() {
CountDownLatch latch = new CountDownLatch(1);
Atomix client = AtomixClient.builder(members)
.withTransport(new LocalTransport(registry))
.withSerializer(serializer.clone())
.withResourceResolver(r -> r.register(resourceType()))
.build();
client.open().thenRun(latch::countDown);
atomixClients.add(client);
Uninterruptibles.awaitUninterruptibly(latch);
return client;
}
}