Madan Jampani
Committed by Gerrit Code Review

ONOS-2097: Ensure updates made via transactional map result in state change notifications

Change-Id: Iecc1b54d2c4c976278e77dbd825d3e3954c53602
package org.onosproject.store.consistent.impl;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Collections;
import java.util.List;
import com.google.common.collect.ImmutableList;
/**
* Result of a Transaction commit operation.
*/
public final class CommitResponse {
private boolean success;
private List<UpdateResult<String, byte[]>> updates;
public static CommitResponse success(List<UpdateResult<String, byte[]>> updates) {
return new CommitResponse(true, updates);
}
public static CommitResponse failure() {
return new CommitResponse(false, Collections.emptyList());
}
private CommitResponse(boolean success, List<UpdateResult<String, byte[]>> updates) {
this.success = success;
this.updates = ImmutableList.copyOf(updates);
}
public boolean success() {
return success;
}
public List<UpdateResult<String, byte[]>> updates() {
return updates;
}
@Override
public String toString() {
return toStringHelper(this)
.add("success", success)
.add("udpates", updates)
.toString();
}
}
......@@ -193,7 +193,7 @@ public interface DatabaseProxy<K, V> {
* @param transaction transaction to commit (after preparation)
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction);
/**
* Prepare the specified transaction for commit. A successful prepare implies
......@@ -213,7 +213,7 @@ public interface DatabaseProxy<K, V> {
* @param transaction transaction to commit
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Boolean> commit(Transaction transaction);
CompletableFuture<CommitResponse> commit(Transaction transaction);
/**
* Rollback the specified transaction. A successful rollback implies
......
......@@ -75,6 +75,7 @@ public class DatabaseSerializer extends SerializerConfig {
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
.register(org.onosproject.store.consistent.impl.CommitResponse.class)
.register(Match.class)
.register(NodeId.class)
.build();
......
......@@ -102,13 +102,13 @@ public interface DatabaseState<K, V> {
Long counterGet(String counterName);
@Command
boolean prepareAndCommit(Transaction transaction);
CommitResponse prepareAndCommit(Transaction transaction);
@Command
boolean prepare(Transaction transaction);
@Command
boolean commit(Transaction transaction);
CommitResponse commit(Transaction transaction);
@Command
boolean rollback(Transaction transaction);
......
......@@ -32,6 +32,7 @@ import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
......@@ -47,6 +48,7 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -83,7 +85,6 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
private static final String REPLACE = "replace";
private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final Logger log = getLogger(getClass());
......@@ -127,6 +128,16 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
notifyListeners(mapEvent);
}
} else if (update.target() == TX_COMMIT) {
CommitResponse response = update.output();
if (response.success()) {
response.updates().forEach(u -> {
if (u.mapName().equals(name)) {
MapEvent<K, V> mapEvent = u.<K, V>map(this::dK, serializer::decode).toMapEvent();
notifyListeners(mapEvent);
}
});
}
}
});
});
......@@ -439,4 +450,4 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
});
}
}
\ No newline at end of file
}
......
......@@ -174,7 +174,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
return checkOpen(() -> proxy.prepareAndCommit(transaction));
}
......@@ -184,7 +184,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
public CompletableFuture<Boolean> commit(Transaction transaction) {
public CompletableFuture<CommitResponse> commit(Transaction transaction) {
return checkOpen(() -> proxy.commit(transaction));
}
......
......@@ -31,11 +31,10 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.DatabaseUpdate.Type;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import net.kuujo.copycat.state.Initializer;
......@@ -239,11 +238,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
@Override
public boolean prepareAndCommit(Transaction transaction) {
public CommitResponse prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
}
return false;
return CommitResponse.failure();
}
@Override
......@@ -263,9 +262,9 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
@Override
public boolean commit(Transaction transaction) {
transaction.updates().forEach(update -> commitProvisionalUpdate(update, transaction.id()));
return true;
public CommitResponse commit(Transaction transaction) {
return CommitResponse.success(Lists.transform(transaction.updates(),
update -> commitProvisionalUpdate(update, transaction.id())));
}
@Override
......@@ -334,32 +333,16 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
}
private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
String mapName = update.mapName();
String key = update.key();
Type type = update.type();
Update provisionalUpdate = getLockMap(mapName).get(key);
if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
getLockMap(mapName).remove(key);
} else {
return;
}
switch (type) {
case PUT:
case PUT_IF_ABSENT:
case PUT_IF_VERSION_MATCH:
case PUT_IF_VALUE_MATCH:
mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value());
break;
case REMOVE:
case REMOVE_IF_VERSION_MATCH:
case REMOVE_IF_VALUE_MATCH:
mapUpdate(mapName, key, Match.any(), Match.any(), null);
break;
default:
break;
throw new IllegalStateException("Invalid transaction Id");
}
return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
}
private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
......
......@@ -25,12 +25,13 @@ import static com.google.common.base.Preconditions.*;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionException;
import org.onosproject.store.service.TransactionalMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
/**
* Default TransactionContext implementation.
......@@ -86,24 +87,30 @@ public class DefaultTransactionContext implements TransactionContext {
@SuppressWarnings("unchecked")
@Override
public void commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
CommitResponse response = null;
try {
List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values()
.forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
// FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
} catch (Exception e) {
abort();
throw new TransactionException(e);
txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates()));
Transaction transaction = new DefaultTransaction(transactionId, updates);
response = Futures.getUnchecked(database.prepareAndCommit(transaction));
} finally {
if (response != null && !response.success()) {
abort();
}
isOpen = false;
}
}
@Override
public void abort() {
checkState(isOpen, TX_NOT_OPEN_ERROR);
txMaps.values().forEach(m -> m.rollback());
if (isOpen) {
try {
txMaps.values().forEach(m -> m.rollback());
} finally {
isOpen = false;
}
}
}
}
......
......@@ -33,6 +33,7 @@ import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -246,10 +247,10 @@ public class PartitionedDatabase implements Database {
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
public CompletableFuture<CommitResponse> prepareAndCommit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
if (subTransactions.isEmpty()) {
return CompletableFuture.completedFuture(true);
return CompletableFuture.completedFuture(CommitResponse.success(ImmutableList.of()));
} else if (subTransactions.size() == 1) {
Entry<Database, Transaction> entry =
subTransactions.entrySet().iterator().next();
......@@ -277,13 +278,22 @@ public class PartitionedDatabase implements Database {
}
@Override
public CompletableFuture<Boolean> commit(Transaction transaction) {
public CompletableFuture<CommitResponse> commit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
AtomicBoolean success = new AtomicBoolean(true);
List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().commit(entry.getValue()))
.toArray(CompletableFuture[]::new))
.thenApply(v -> true);
.stream()
.map(entry -> entry.getKey().commit(entry.getValue())
.thenAccept(response -> {
success.set(success.get() && response.success());
if (success.get()) {
allUpdates.addAll(response.updates());
}
}))
.toArray(CompletableFuture[]::new))
.thenApply(v -> success.get() ?
CommitResponse.success(allUpdates) : CommitResponse.failure());
}
@Override
......
......@@ -32,6 +32,11 @@ public class StateMachineUpdate {
MAP,
/**
* Update is a transaction commit.
*/
TX_COMMIT,
/**
* Update is for a non-map data structure.
*/
OTHER
......@@ -51,6 +56,8 @@ public class StateMachineUpdate {
// FIXME: This check is brittle
if (operationName.contains("mapUpdate")) {
return Target.MAP;
} else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
return Target.TX_COMMIT;
} else {
return Target.OTHER;
}
......
......@@ -32,6 +32,8 @@ import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.Transaction.State;
import com.google.common.collect.ImmutableList;
/**
* Agent that runs the two phase commit protocol.
*/
......@@ -71,15 +73,15 @@ public class TransactionManager {
* @return transaction result. Result value true indicates a successful commit, false
* indicates abort
*/
public CompletableFuture<Boolean> execute(Transaction transaction) {
public CompletableFuture<CommitResponse> execute(Transaction transaction) {
// clean up if this transaction in already in a terminal state.
if (transaction.state() == Transaction.State.COMMITTED ||
transaction.state() == Transaction.State.ROLLEDBACK) {
return transactions.remove(transaction.id()).thenApply(v -> true);
return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
} else if (transaction.state() == Transaction.State.COMMITTING) {
return commit(transaction);
} else if (transaction.state() == Transaction.State.ROLLINGBACK) {
return rollback(transaction);
return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
} else {
return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
}
......@@ -107,19 +109,18 @@ public class TransactionManager {
.thenApply(v -> status));
}
private CompletableFuture<Boolean> commit(Transaction transaction) {
private CompletableFuture<CommitResponse> commit(Transaction transaction) {
return database.commit(transaction)
.thenCompose(v -> transactions.put(
.whenComplete((r, e) -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.COMMITTED)))
.thenApply(v -> true);
transaction.transition(Transaction.State.COMMITTED)));
}
private CompletableFuture<Boolean> rollback(Transaction transaction) {
private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
return database.rollback(transaction)
.thenCompose(v -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.ROLLEDBACK)))
.thenApply(v -> true);
.thenApply(v -> CommitResponse.failure());
}
}
......