Madan Jampani

Performance improvements

 - Fast path of transactions updating a single key.
 - Bug fix in StoragePartitionClient where we were always creating a CachingMap

Change-Id: Ide117fba34fd12a9ff4aabd5fb7a21952bae672b
...@@ -342,6 +342,14 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive { ...@@ -342,6 +342,14 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
342 CompletableFuture<Void> rollback(TransactionId transactionId); 342 CompletableFuture<Void> rollback(TransactionId transactionId);
343 343
344 /** 344 /**
345 + * Prepares a transaction and commits it in one go.
346 + * @param transaction transaction
347 + * @return {@code true} if operation is successful and updates are committed
348 + * {@code false} otherwise
349 + */
350 + CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction);
351 +
352 + /**
345 * Returns a new {@link ConsistentMap} that is backed by this instance. 353 * Returns a new {@link ConsistentMap} that is backed by this instance.
346 * 354 *
347 * @return new {@code ConsistentMap} instance 355 * @return new {@code ConsistentMap} instance
......
...@@ -76,7 +76,7 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -76,7 +76,7 @@ public class DefaultTransactionContext implements TransactionContext {
76 public CompletableFuture<CommitStatus> commit() { 76 public CompletableFuture<CommitStatus> commit() {
77 final MeteringAgent.Context timer = monitor.startTimer("commit"); 77 final MeteringAgent.Context timer = monitor.startTimer("commit");
78 return transactionCoordinator.commit(transactionId, txParticipants) 78 return transactionCoordinator.commit(transactionId, txParticipants)
79 - .whenComplete((r, e) -> timer.stop(e)); 79 + .whenComplete((r, e) -> timer.stop(e));
80 } 80 }
81 81
82 @Override 82 @Override
...@@ -89,7 +89,7 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -89,7 +89,7 @@ public class DefaultTransactionContext implements TransactionContext {
89 Serializer serializer) { 89 Serializer serializer) {
90 // FIXME: Do not create duplicates. 90 // FIXME: Do not create duplicates.
91 DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName, 91 DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
92 - creator.<K, V>newAsyncConsistentMap(mapName, serializer), 92 + DistributedPrimitives.newMeteredMap(creator.<K, V>newAsyncConsistentMap(mapName, serializer)),
93 this, 93 this,
94 serializer); 94 serializer);
95 txParticipants.add(txMap); 95 txParticipants.add(txMap);
......
...@@ -180,8 +180,13 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, Tr ...@@ -180,8 +180,13 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, Tr
180 } 180 }
181 181
182 @Override 182 @Override
183 - public boolean hasPendingUpdates() { 183 + public CompletableFuture<Boolean> prepareAndCommit() {
184 - return updates().size() > 0; 184 + return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
185 + }
186 +
187 + @Override
188 + public int totalUpdates() {
189 + return updates().size();
185 } 190 }
186 191
187 protected List<MapUpdate<K, V>> updates() { 192 protected List<MapUpdate<K, V>> updates() {
......
...@@ -178,6 +178,11 @@ public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, ...@@ -178,6 +178,11 @@ public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K,
178 } 178 }
179 179
180 @Override 180 @Override
181 + public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
182 + return delegateMap.prepareAndCommit(transaction);
183 + }
184 +
185 + @Override
181 public String toString() { 186 public String toString() {
182 return MoreObjects.toStringHelper(getClass()) 187 return MoreObjects.toStringHelper(getClass())
183 .add("delegateMap", delegateMap) 188 .add("delegateMap", delegateMap)
......
...@@ -25,13 +25,16 @@ import java.util.function.BiFunction; ...@@ -25,13 +25,16 @@ import java.util.function.BiFunction;
25 import java.util.function.Function; 25 import java.util.function.Function;
26 import java.util.function.Predicate; 26 import java.util.function.Predicate;
27 27
28 +import org.onosproject.store.primitives.TransactionId;
28 import org.onosproject.store.service.AsyncConsistentMap; 29 import org.onosproject.store.service.AsyncConsistentMap;
29 import org.onosproject.store.service.MapEvent; 30 import org.onosproject.store.service.MapEvent;
30 import org.onosproject.store.service.MapEventListener; 31 import org.onosproject.store.service.MapEventListener;
32 +import org.onosproject.store.service.MapTransaction;
31 import org.onosproject.store.service.Versioned; 33 import org.onosproject.store.service.Versioned;
32 34
33 import com.google.common.base.Throwables; 35 import com.google.common.base.Throwables;
34 import com.google.common.collect.Maps; 36 import com.google.common.collect.Maps;
37 +
35 import org.onosproject.utils.MeteringAgent; 38 import org.onosproject.utils.MeteringAgent;
36 39
37 /** 40 /**
...@@ -59,6 +62,10 @@ public class MeteredAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentM ...@@ -59,6 +62,10 @@ public class MeteredAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentM
59 private static final String ENTRY_SET = "entrySet"; 62 private static final String ENTRY_SET = "entrySet";
60 private static final String REPLACE = "replace"; 63 private static final String REPLACE = "replace";
61 private static final String COMPUTE_IF_ABSENT = "computeIfAbsent"; 64 private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
65 + private static final String PREPARE = "prepare";
66 + private static final String COMMIT = "commit";
67 + private static final String ROLLBACK = "rollback";
68 + private static final String PREPARE_AND_COMMIT = "prepareAndCommit";
62 private static final String ADD_LISTENER = "addListener"; 69 private static final String ADD_LISTENER = "addListener";
63 private static final String REMOVE_LISTENER = "removeListener"; 70 private static final String REMOVE_LISTENER = "removeListener";
64 private static final String NOTIFY_LISTENER = "notifyListener"; 71 private static final String NOTIFY_LISTENER = "notifyListener";
...@@ -240,6 +247,34 @@ public class MeteredAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentM ...@@ -240,6 +247,34 @@ public class MeteredAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentM
240 } 247 }
241 } 248 }
242 249
250 + @Override
251 + public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
252 + final MeteringAgent.Context timer = monitor.startTimer(PREPARE);
253 + return super.prepare(transaction)
254 + .whenComplete((r, e) -> timer.stop(e));
255 + }
256 +
257 + @Override
258 + public CompletableFuture<Void> commit(TransactionId transactionId) {
259 + final MeteringAgent.Context timer = monitor.startTimer(COMMIT);
260 + return super.commit(transactionId)
261 + .whenComplete((r, e) -> timer.stop(e));
262 + }
263 +
264 + @Override
265 + public CompletableFuture<Void> rollback(TransactionId transactionId) {
266 + final MeteringAgent.Context timer = monitor.startTimer(ROLLBACK);
267 + return super.rollback(transactionId)
268 + .whenComplete((r, e) -> timer.stop(e));
269 + }
270 +
271 + @Override
272 + public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
273 + final MeteringAgent.Context timer = monitor.startTimer(PREPARE_AND_COMMIT);
274 + return super.prepareAndCommit(transaction)
275 + .whenComplete((r, e) -> timer.stop(e));
276 + }
277 +
243 private class InternalMeteredMapEventListener implements MapEventListener<K, V> { 278 private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
244 279
245 private final MapEventListener<K, V> listener; 280 private final MapEventListener<K, V> listener;
......
...@@ -236,6 +236,24 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K ...@@ -236,6 +236,24 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
236 .toArray(CompletableFuture[]::new)); 236 .toArray(CompletableFuture[]::new));
237 } 237 }
238 238
239 + @Override
240 + public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
241 + Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
242 + transaction.updates().forEach(update -> {
243 + AsyncConsistentMap<K, V> map = getMap(update.key());
244 + updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
245 + });
246 + Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
247 + Maps.transformValues(updatesGroupedByMap,
248 + list -> new MapTransaction<>(transaction.transactionId(), list));
249 +
250 + return Tools.allOf(transactionsByMap.entrySet()
251 + .stream()
252 + .map(e -> e.getKey().prepareAndCommit(e.getValue()))
253 + .collect(Collectors.toList()))
254 + .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
255 + }
256 +
239 /** 257 /**
240 * Returns the map (partition) to which the specified key maps. 258 * Returns the map (partition) to which the specified key maps.
241 * @param key key 259 * @param key key
......
...@@ -108,7 +108,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -108,7 +108,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
108 value -> value == null ? null : serializer.encode(value), 108 value -> value == null ? null : serializer.encode(value),
109 bytes -> serializer.decode(bytes)); 109 bytes -> serializer.decode(bytes));
110 110
111 - return DistributedPrimitives.newCachingMap(transcodedMap); 111 + return transcodedMap;
112 } 112 }
113 113
114 @Override 114 @Override
......
...@@ -44,20 +44,32 @@ public class TransactionCoordinator { ...@@ -44,20 +44,32 @@ public class TransactionCoordinator {
44 */ 44 */
45 CompletableFuture<CommitStatus> commit(TransactionId transactionId, 45 CompletableFuture<CommitStatus> commit(TransactionId transactionId,
46 Set<TransactionParticipant> transactionParticipants) { 46 Set<TransactionParticipant> transactionParticipants) {
47 - if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) { 47 + int totalUpdates = transactionParticipants.stream()
48 - return CompletableFuture.completedFuture(CommitStatus.SUCCESS); 48 + .map(TransactionParticipant::totalUpdates)
49 - } 49 + .reduce(Math::addExact)
50 + .orElse(0);
50 51
51 - CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING) 52 + if (totalUpdates == 0) {
53 + return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
54 + } else if (totalUpdates == 1) {
55 + return transactionParticipants.stream()
56 + .filter(p -> p.totalUpdates() == 1)
57 + .findFirst()
58 + .get()
59 + .prepareAndCommit()
60 + .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
61 + } else {
62 + CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
52 .thenCompose(v -> this.doPrepare(transactionParticipants)) 63 .thenCompose(v -> this.doPrepare(transactionParticipants))
53 .thenCompose(result -> result 64 .thenCompose(result -> result
54 - ? transactions.put(transactionId, Transaction.State.COMMITTING) 65 + ? transactions.put(transactionId, Transaction.State.COMMITTING)
55 - .thenCompose(v -> doCommit(transactionParticipants)) 66 + .thenCompose(v -> doCommit(transactionParticipants))
56 - .thenApply(v -> CommitStatus.SUCCESS) 67 + .thenApply(v -> CommitStatus.SUCCESS)
57 - : transactions.put(transactionId, Transaction.State.ROLLINGBACK) 68 + : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
58 - .thenCompose(v -> doRollback(transactionParticipants)) 69 + .thenCompose(v -> doRollback(transactionParticipants))
59 - .thenApply(v -> CommitStatus.FAILURE)); 70 + .thenApply(v -> CommitStatus.FAILURE));
60 - return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v)); 71 + return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
72 + }
61 } 73 }
62 74
63 private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) { 75 private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
......
...@@ -26,7 +26,21 @@ public interface TransactionParticipant { ...@@ -26,7 +26,21 @@ public interface TransactionParticipant {
26 * Returns if this participant has updates that need to be committed. 26 * Returns if this participant has updates that need to be committed.
27 * @return {@code true} if yes; {@code false} otherwise 27 * @return {@code true} if yes; {@code false} otherwise
28 */ 28 */
29 - boolean hasPendingUpdates(); 29 + default boolean hasPendingUpdates() {
30 + return totalUpdates() > 0;
31 + }
32 +
33 + /**
34 + * Returns the number of updates that need to committed for this participant.
35 + * @return update count.
36 + */
37 + int totalUpdates();
38 +
39 + /**
40 + * Executes the prepare and commit steps in a single go.
41 + * @return {@code true} is successful i.e updates are committed; {@code false} otherwise
42 + */
43 + CompletableFuture<Boolean> prepareAndCommit();
30 44
31 /** 45 /**
32 * Executes the prepare phase. 46 * Executes the prepare phase.
......
...@@ -213,6 +213,11 @@ public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsi ...@@ -213,6 +213,11 @@ public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsi
213 return backingMap.rollback(transactionId); 213 return backingMap.rollback(transactionId);
214 } 214 }
215 215
216 + @Override
217 + public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K1, V1> transaction) {
218 + return backingMap.prepareAndCommit(transaction.map(keyEncoder, valueEncoder));
219 + }
220 +
216 private class InternalBackingMapEventListener implements MapEventListener<K2, V2> { 221 private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
217 222
218 private final MapEventListener<K1, V1> listener; 223 private final MapEventListener<K1, V1> listener;
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.primitives.resources.impl; 16 package org.onosproject.store.primitives.resources.impl;
17 17
18 -import static org.slf4j.LoggerFactory.getLogger;
19 import io.atomix.copycat.client.CopycatClient; 18 import io.atomix.copycat.client.CopycatClient;
20 import io.atomix.resource.AbstractResource; 19 import io.atomix.resource.AbstractResource;
21 import io.atomix.resource.ResourceTypeInfo; 20 import io.atomix.resource.ResourceTypeInfo;
...@@ -44,6 +43,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman ...@@ -44,6 +43,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman
44 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size; 43 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
45 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit; 44 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
46 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare; 45 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
46 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
47 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback; 47 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
48 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten; 48 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
49 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet; 49 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
...@@ -53,7 +53,6 @@ import org.onosproject.store.service.MapEvent; ...@@ -53,7 +53,6 @@ import org.onosproject.store.service.MapEvent;
53 import org.onosproject.store.service.MapEventListener; 53 import org.onosproject.store.service.MapEventListener;
54 import org.onosproject.store.service.MapTransaction; 54 import org.onosproject.store.service.MapTransaction;
55 import org.onosproject.store.service.Versioned; 55 import org.onosproject.store.service.Versioned;
56 -import org.slf4j.Logger;
57 56
58 import com.google.common.collect.Sets; 57 import com.google.common.collect.Sets;
59 58
...@@ -64,7 +63,6 @@ import com.google.common.collect.Sets; ...@@ -64,7 +63,6 @@ import com.google.common.collect.Sets;
64 public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> 63 public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
65 implements AsyncConsistentMap<String, byte[]> { 64 implements AsyncConsistentMap<String, byte[]> {
66 65
67 - private final Logger log = getLogger(getClass());
68 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet(); 66 private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
69 67
70 public static final String CHANGE_SUBJECT = "changeEvents"; 68 public static final String CHANGE_SUBJECT = "changeEvents";
...@@ -288,4 +286,9 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -288,4 +286,9 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
288 return submit(new TransactionRollback(transactionId)) 286 return submit(new TransactionRollback(transactionId))
289 .thenApply(v -> null); 287 .thenApply(v -> null);
290 } 288 }
289 +
290 + @Override
291 + public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
292 + return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
293 + }
291 } 294 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -245,6 +245,19 @@ public final class AtomixConsistentMapCommands { ...@@ -245,6 +245,19 @@ public final class AtomixConsistentMapCommands {
245 } 245 }
246 246
247 /** 247 /**
248 + * Map prepareAndCommit command.
249 + */
250 + @SuppressWarnings("serial")
251 + public static class TransactionPrepareAndCommit extends TransactionPrepare {
252 + public TransactionPrepareAndCommit() {
253 + }
254 +
255 + public TransactionPrepareAndCommit(MapTransaction<String, byte[]> mapTransaction) {
256 + super(mapTransaction);
257 + }
258 + }
259 +
260 + /**
248 * Map transaction commit command. 261 * Map transaction commit command.
249 */ 262 */
250 @SuppressWarnings("serial") 263 @SuppressWarnings("serial")
...@@ -489,12 +502,6 @@ public final class AtomixConsistentMapCommands { ...@@ -489,12 +502,6 @@ public final class AtomixConsistentMapCommands {
489 @Override 502 @Override
490 public void readObject(BufferInput<?> buffer, Serializer serializer) { 503 public void readObject(BufferInput<?> buffer, Serializer serializer) {
491 } 504 }
492 -
493 - @Override
494 - public String toString() {
495 - return MoreObjects.toStringHelper(getClass())
496 - .toString();
497 - }
498 } 505 }
499 506
500 /** 507 /**
...@@ -509,12 +516,6 @@ public final class AtomixConsistentMapCommands { ...@@ -509,12 +516,6 @@ public final class AtomixConsistentMapCommands {
509 @Override 516 @Override
510 public void readObject(BufferInput<?> buffer, Serializer serializer) { 517 public void readObject(BufferInput<?> buffer, Serializer serializer) {
511 } 518 }
512 -
513 - @Override
514 - public String toString() {
515 - return MoreObjects.toStringHelper(getClass())
516 - .toString();
517 - }
518 } 519 }
519 520
520 /** 521 /**
...@@ -537,7 +538,8 @@ public final class AtomixConsistentMapCommands { ...@@ -537,7 +538,8 @@ public final class AtomixConsistentMapCommands {
537 registry.register(TransactionPrepare.class, -772); 538 registry.register(TransactionPrepare.class, -772);
538 registry.register(TransactionCommit.class, -773); 539 registry.register(TransactionCommit.class, -773);
539 registry.register(TransactionRollback.class, -774); 540 registry.register(TransactionRollback.class, -774);
540 - registry.register(UpdateAndGet.class, -775); 541 + registry.register(TransactionPrepareAndCommit.class, -775);
542 + registry.register(UpdateAndGet.class, -776);
541 } 543 }
542 } 544 }
543 } 545 }
......
...@@ -52,6 +52,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman ...@@ -52,6 +52,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman
52 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size; 52 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
53 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit; 53 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
54 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare; 54 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
55 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
55 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback; 56 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
56 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten; 57 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
57 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet; 58 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
...@@ -111,6 +112,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se ...@@ -111,6 +112,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
111 executor.register(TransactionPrepare.class, this::prepare); 112 executor.register(TransactionPrepare.class, this::prepare);
112 executor.register(TransactionCommit.class, this::commit); 113 executor.register(TransactionCommit.class, this::commit);
113 executor.register(TransactionRollback.class, this::rollback); 114 executor.register(TransactionRollback.class, this::rollback);
115 + executor.register(TransactionPrepareAndCommit.class, this::prepareAndCommit);
114 } 116 }
115 117
116 @Override 118 @Override
...@@ -352,6 +354,20 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se ...@@ -352,6 +354,20 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
352 } 354 }
353 355
354 /** 356 /**
357 + * Handles an prepare and commit commit.
358 + *
359 + * @param commit transaction prepare and commit commit
360 + * @return prepare result
361 + */
362 + protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
363 + PrepareResult prepareResult = prepare(commit);
364 + if (prepareResult == PrepareResult.OK) {
365 + commitInternal(commit.operation().transaction().transactionId());
366 + }
367 + return prepareResult;
368 + }
369 +
370 + /**
355 * Handles an prepare commit. 371 * Handles an prepare commit.
356 * 372 *
357 * @param commit transaction prepare commit 373 * @param commit transaction prepare commit
...@@ -399,44 +415,48 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se ...@@ -399,44 +415,48 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
399 protected CommitResult commit(Commit<? extends TransactionCommit> commit) { 415 protected CommitResult commit(Commit<? extends TransactionCommit> commit) {
400 TransactionId transactionId = commit.operation().transactionId(); 416 TransactionId transactionId = commit.operation().transactionId();
401 try { 417 try {
402 - Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions 418 + return commitInternal(transactionId);
403 - .remove(transactionId);
404 - if (prepareCommit == null) {
405 - return CommitResult.UNKNOWN_TRANSACTION_ID;
406 - }
407 - MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
408 - long totalReferencesToCommit = transaction
409 - .updates()
410 - .stream()
411 - .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
412 - .count();
413 - CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
414 - new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
415 - List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
416 - for (MapUpdate<String, byte[]> update : transaction.updates()) {
417 - String key = update.key();
418 - MapEntryValue previousValue = mapEntries.remove(key);
419 - MapEntryValue newValue = null;
420 - checkState(preparedKeys.remove(key), "key is not prepared");
421 - if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
422 - newValue = new TransactionalCommit(key,
423 - versionCounter.incrementAndGet(), completer);
424 - }
425 - eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
426 - if (newValue != null) {
427 - mapEntries.put(key, newValue);
428 - }
429 - if (previousValue != null) {
430 - previousValue.discard();
431 - }
432 - }
433 - publish(eventsToPublish);
434 - return CommitResult.OK;
435 } finally { 419 } finally {
436 commit.close(); 420 commit.close();
437 } 421 }
438 } 422 }
439 423
424 + private CommitResult commitInternal(TransactionId transactionId) {
425 + Commit<? extends TransactionPrepare> prepareCommit = pendingTransactions
426 + .remove(transactionId);
427 + if (prepareCommit == null) {
428 + return CommitResult.UNKNOWN_TRANSACTION_ID;
429 + }
430 + MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
431 + long totalReferencesToCommit = transaction
432 + .updates()
433 + .stream()
434 + .filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
435 + .count();
436 + CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
437 + new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
438 + List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
439 + for (MapUpdate<String, byte[]> update : transaction.updates()) {
440 + String key = update.key();
441 + MapEntryValue previousValue = mapEntries.remove(key);
442 + MapEntryValue newValue = null;
443 + checkState(preparedKeys.remove(key), "key is not prepared");
444 + if (update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH) {
445 + newValue = new TransactionalCommit(key,
446 + versionCounter.incrementAndGet(), completer);
447 + }
448 + eventsToPublish.add(new MapEvent<>("", key, toVersioned(newValue), toVersioned(previousValue)));
449 + if (newValue != null) {
450 + mapEntries.put(key, newValue);
451 + }
452 + if (previousValue != null) {
453 + previousValue.discard();
454 + }
455 + }
456 + publish(eventsToPublish);
457 + return CommitResult.OK;
458 + }
459 +
440 /** 460 /**
441 * Handles an rollback commit (ha!). 461 * Handles an rollback commit (ha!).
442 * 462 *
......