Madan Jampani
Committed by Gerrit Code Review

Refactored primitive builders to consolidate methods into the base DistributedPrimitiveBuilder

Change-Id: I9a24117b41d1feeb5cf460c6adfa484aabcbb8c1
Showing 19 changed files with 181 additions and 393 deletions
......@@ -89,7 +89,8 @@ public class DistributedDhcpStore implements DhcpStore {
freeIPPool = storageService.<Ip4Address>setBuilder()
.withName("onos-dhcp-freeIP")
.withSerializer(Serializer.using(KryoNamespaces.API))
.build();
.build()
.asDistributedSet();
log.info("Started");
}
......
......@@ -56,7 +56,8 @@ public class SetTestAddCommand extends AbstractShellCommand {
set = storageService.<String>setBuilder()
.withName(setName)
.withSerializer(serializer)
.build();
.build()
.asDistributedSet();
// Add a single element to the set
if (values.length == 1) {
......
......@@ -61,7 +61,8 @@ public class SetTestGetCommand extends AbstractShellCommand {
set = storageService.<String>setBuilder()
.withName(setName)
.withSerializer(serializer)
.build();
.build()
.asDistributedSet();
// Print the set size
if (size) {
......
......@@ -64,7 +64,8 @@ public class SetTestRemoveCommand extends AbstractShellCommand {
set = storageService.<String>setBuilder()
.withName(setName)
.withSerializer(serializer)
.build();
.build()
.asDistributedSet();
if (clear) {
set.clear();
......
......@@ -52,7 +52,8 @@ public class ClassifierManager implements ClassifierService {
classifierList = storageService.<DeviceId>setBuilder()
.withName("classifier")
.withSerializer(Serializer.using(KryoNamespaces.API))
.build();
.build()
.asDistributedSet();
log.info("Started");
}
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
package org.onosproject.store.primitives;
import java.lang.reflect.Array;
import java.util.Collection;
......@@ -37,30 +37,14 @@ import org.onosproject.store.service.Synchronous;
*/
public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>> implements DistributedSet<E> {
private static final long OPERATION_TIMEOUT_MILLIS = 5000;
private final long operationTimeoutMillis;
private final AsyncDistributedSet<E> asyncSet;
public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet) {
public DefaultDistributedSet(AsyncDistributedSet<E> asyncSet, long operationTimeoutMillis) {
super(asyncSet);
this.asyncSet = asyncSet;
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageException.Interrupted();
} catch (TimeoutException e) {
throw new StorageException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof StorageException) {
throw (StorageException) e.getCause();
} else {
throw new StorageException(e.getCause());
}
}
this.operationTimeoutMillis = operationTimeoutMillis;
}
@Override
......@@ -149,4 +133,21 @@ public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>
public void removeListener(SetEventListener<E> listener) {
complete(asyncSet.removeListener(listener));
}
private <T> T complete(CompletableFuture<T> future) {
try {
return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageException.Interrupted();
} catch (TimeoutException e) {
throw new StorageException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof StorageException) {
throw (StorageException) e.getCause();
} else {
throw new StorageException(e.getCause());
}
}
}
}
......
......@@ -24,7 +24,8 @@ import org.onosproject.store.service.Serializer;
*
* @param <T> distributed primitive type
*/
public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive> {
public abstract class DistributedPrimitiveBuilder<B extends DistributedPrimitiveBuilder<B, T>,
T extends DistributedPrimitive> {
private DistributedPrimitive.Type type;
private String name;
......@@ -32,6 +33,8 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
private Serializer serializer;
private boolean partitionsDisabled = false;
private boolean meteringDisabled = false;
private boolean readOnly = false;
private boolean relaxedReadConsistency = false;
public DistributedPrimitiveBuilder(DistributedPrimitive.Type type) {
this.type = type;
......@@ -43,9 +46,9 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
* @param name primitive name
* @return this builder
*/
public DistributedPrimitiveBuilder<T> withName(String name) {
public B withName(String name) {
this.name = name;
return this;
return (B) this;
}
/**
......@@ -54,9 +57,9 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
* @param serializer serializer
* @return this builder
*/
public DistributedPrimitiveBuilder<T> withSerializer(Serializer serializer) {
public B withSerializer(Serializer serializer) {
this.serializer = serializer;
return this;
return (B) this;
}
/**
......@@ -65,9 +68,9 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
* @param applicationId application identifier
* @return this builder
*/
public DistributedPrimitiveBuilder<T> withApplicationId(ApplicationId applicationId) {
public B withApplicationId(ApplicationId applicationId) {
this.applicationId = applicationId;
return this;
return (B) this;
}
/**
......@@ -77,9 +80,9 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
* @return this builder
*/
@Deprecated
public DistributedPrimitiveBuilder<T> withPartitionsDisabled() {
public B withPartitionsDisabled() {
this.partitionsDisabled = true;
return this;
return (B) this;
}
/**
......@@ -88,9 +91,27 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
* @return this builder
*/
@Deprecated
public DistributedPrimitiveBuilder<T> withMeteringDisabled() {
public B withMeteringDisabled() {
this.meteringDisabled = true;
return this;
return (B) this;
}
/**
* Disables state changing operations on the returned distributed primitive.
* @return this builder
*/
public B withUpdatesDisabled() {
this.readOnly = true;
return (B) this;
}
/**
* Turns on relaxed consistency for read operations.
* @return this builder
*/
public B withRelaxedReadConsistency() {
this.relaxedReadConsistency = true;
return (B) this;
}
/**
......@@ -112,6 +133,24 @@ public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive
}
/**
* Returns if updates are disabled.
*
* @return {@code true} if yes; {@code false} otherwise
*/
public final boolean readOnly() {
return readOnly;
}
/**
* Returns if consistency is relaxed for read operations.
*
* @return {@code true} if yes; {@code false} otherwise
*/
public final boolean relaxedReadConsistency() {
return relaxedReadConsistency;
}
/**
* Returns the serializer.
*
* @return serializer
......
......@@ -19,6 +19,8 @@ import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.primitives.DefaultDistributedSet;
/**
* A distributed collection designed for holding unique elements.
* <p>
......@@ -122,6 +124,26 @@ public interface AsyncDistributedSet<E> extends DistributedPrimitive {
*/
CompletableFuture<Boolean> removeAll(Collection<? extends E> c);
/**
* Returns a new {@link DistributedSet} that is backed by this instance.
*
* @return new {@code DistributedSet} instance
*/
default DistributedSet<E> asDistributedSet() {
return asDistributedSet(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS);
}
/**
* Returns a new {@link DistributedSet} that is backed by this instance.
*
* @param timeoutMillis timeout duration for the returned DistributedSet operations
* @return new {@code DistributedSet} instance
*/
default DistributedSet<E> asDistributedSet(long timeoutMillis) {
return new DefaultDistributedSet<>(this, timeoutMillis);
}
/**
* Returns the entries as a immutable set. The returned set is a snapshot and will not reflect new changes made to
* this AsyncDistributedSet
......
......@@ -20,7 +20,8 @@ import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for AtomicCounter.
*/
public abstract class AtomicCounterBuilder extends DistributedPrimitiveBuilder<AsyncAtomicCounter> {
public abstract class AtomicCounterBuilder
extends DistributedPrimitiveBuilder<AtomicCounterBuilder, AsyncAtomicCounter> {
public AtomicCounterBuilder() {
super(DistributedPrimitive.Type.COUNTER);
}
......
......@@ -22,7 +22,8 @@ import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
*
* @param <V> atomic value type
*/
public abstract class AtomicValueBuilder<V> extends DistributedPrimitiveBuilder<AsyncAtomicValue<V>> {
public abstract class AtomicValueBuilder<V>
extends DistributedPrimitiveBuilder<AtomicValueBuilder<V>, AsyncAtomicValue<V>> {
public AtomicValueBuilder() {
super(DistributedPrimitive.Type.VALUE);
......
......@@ -15,7 +15,7 @@
*/
package org.onosproject.store.service;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for {@link ConsistentMap} instances.
......@@ -23,115 +23,32 @@ import org.onosproject.core.ApplicationId;
* @param <K> type for map key
* @param <V> type for map value
*/
public interface ConsistentMapBuilder<K, V> {
public abstract class ConsistentMapBuilder<K, V>
extends DistributedPrimitiveBuilder<ConsistentMapBuilder<K, V>, ConsistentMap<K, V>> {
/**
* Sets the name of the map.
* <p>
* Each map is identified by a unique map name. Different instances with the same name are all backed by the
* same backend state.
* </p>
* <p>
* <b>Note:</b> This is a mandatory parameter.
* </p>
*
* @param name name of the map
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withName(String name);
/**
* Sets the identifier of the application that owns this map instance.
* <p>
* Note: If {@code purgeOnUninstall} option is enabled, applicationId
* must be specified.
* </p>
*
* @param id applicationId owning the consistent map
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id);
/**
* Sets a serializer that can be used to serialize
* both the keys and values inserted into the map. The serializer
* builder should be pre-populated with any classes that will be
* put into the map.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializer serializer
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withSerializer(Serializer serializer);
/**
* Disables distribution of map entries across multiple database partitions.
* <p>
* When partitioning is disabled, the returned map will have a single partition
* that spans the entire cluster. Furthermore, the changes made to the map are
* ephemeral and do not survive a full cluster restart.
* </p>
* <p>
* Disabling partitions is more appropriate when the returned map is used for
* coordination activities such as leader election and not for long term data persistence.
* </p>
* <p>
* Note: By default partitions are enabled and entries in the map are durable.
* </p>
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withPartitionsDisabled();
private boolean purgeOnUninstall = false;
/**
* Disables map updates.
* <p>
* Attempt to update the built map will throw {@code UnsupportedOperationException}.
*
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withUpdatesDisabled();
/**
* Purges map contents when the application owning the map is uninstalled.
* <p>
* When this option is enabled, the caller must provide a applicationId via
* the {@code withAppliationId} builder method.
* <p>
* By default map entries will NOT be purged when owning application is uninstalled.
*
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withPurgeOnUninstall();
public ConsistentMapBuilder() {
super(DistributedPrimitive.Type.CONSISTENT_MAP);
}
/**
* Instantiates Metering service to gather usage and performance metrics.
* By default, usage data will be stored.
* Clears map contents when the owning application is uninstalled.
*
* @return this ConsistentMapBuilder
* return this builder
*/
ConsistentMapBuilder<K, V> withMeteringDisabled();
public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
purgeOnUninstall = true;
return this;
}
/**
* Provides weak consistency for map gets.
* <p>
* While this can lead to improved read performance, it can also make the behavior
* heard to reason. Only turn this on if you know what you are doing. By default
* reads are strongly consistent.
*
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withRelaxedReadConsistency();
/**
* Builds an consistent map based on the configuration options
* supplied to this builder.
*
* @return new consistent map
* @throws java.lang.RuntimeException if a mandatory parameter is missing
* Returns if map entries need to be cleared when owning application is uninstalled.
* @return {@code true} if yes; {@code false} otherwise.
*/
ConsistentMap<K, V> build();
public boolean purgeOnUninstall() {
return purgeOnUninstall;
}
/**
* Builds an async consistent map based on the configuration options
......@@ -140,5 +57,5 @@ public interface ConsistentMapBuilder<K, V> {
* @return new async consistent map
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AsyncConsistentMap<K, V> buildAsyncMap();
public abstract AsyncConsistentMap<K, V> buildAsyncMap();
}
......
......@@ -15,127 +15,37 @@
*/
package org.onosproject.store.service;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for distributed set.
*
* @param <E> type set elements.
*/
public interface DistributedSetBuilder<E> {
public abstract class DistributedSetBuilder<E> extends DistributedPrimitiveBuilder<DistributedSetBuilder<E>,
AsyncDistributedSet<E>> {
/**
* Sets the name of the set.
* <p>
* Each set is identified by a unique name.
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the set
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withName(String name);
private boolean purgeOnUninstall = false;
/**
* Sets the owner applicationId for the set.
* <p>
* Note: If {@code purgeOnUninstall} option is enabled, applicationId
* must be specified.
* </p>
*
* @param id applicationId owning the set
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withApplicationId(ApplicationId id);
public DistributedSetBuilder() {
super(DistributedPrimitive.Type.SET);
}
/**
* Sets a serializer that can be used to serialize
* the elements add to the set. The serializer
* builder should be pre-populated with any classes that will be
* put into the set.
* <p>
* Note: This is a mandatory parameter.
* </p>
* Enables clearing set contents when the owning application is uninstalled.
*
* @param serializer serializer
* @return this DistributedSetBuilder
* return this builder
*/
DistributedSetBuilder<E> withSerializer(Serializer serializer);
public DistributedSetBuilder<E> withPurgeOnUninstall() {
purgeOnUninstall = true;
return this;
}
/**
* Disables set updates.
* <p>
* Attempt to update the built set will throw {@code UnsupportedOperationException}.
*
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withUpdatesDisabled();
/**
* Provides weak consistency for set reads.
* <p>
* While this can lead to improved read performance, it can also make the behavior
* heard to reason. Only turn this on if you know what you are doing. By default
* reads are strongly consistent.
*
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withRelaxedReadConsistency();
/**
* Disables distribution of set entries across multiple database partitions.
* <p>
* When partitioning is disabled, the returned set will have a single partition
* that spans the entire cluster. Furthermore, the changes made to the set are
* ephemeral and do not survive a full cluster restart.
* </p>
* <p>
* Disabling partitions is more appropriate when the returned set is used for
* simple coordination activities and not for long term data persistence.
* </p>
* <p>
* Note: By default partitions are enabled and entries in the set are durable.
* </p>
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withPartitionsDisabled();
/**
* Instantiate Metrics service to gather usage and performance metrics.
* By default usage information is enabled
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withMeteringDisabled();
/**
* Purges set contents when the application owning the set is uninstalled.
* <p>
* When this option is enabled, the caller must provide a applicationId via
* the {@code withAppliationId} builder method.
* <p>
* By default set contents will NOT be purged when owning application is uninstalled.
*
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withPurgeOnUninstall();
/**
* Builds an set based on the configuration options
* supplied to this builder.
*
* @return new set
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
DistributedSet<E> build();
/**
* Builds an {@link AsyncDistributedSet async set} based on the configuration options
* supplied to this builder.
*
* @return new AsyncDistributedSet
* @throws java.lang.RuntimeException if a mandatory parameter is missing
* Returns if set contents need to be cleared when owning application is uninstalled.
* @return {@code true} if yes; {@code false} otherwise.
*/
AsyncDistributedSet<E> buildAsyncSet();
public boolean purgeOnUninstall() {
return purgeOnUninstall;
}
}
......
......@@ -20,7 +20,8 @@ import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Abstract base class for a transaction context builder.
*/
public abstract class TransactionContextBuilder extends DistributedPrimitiveBuilder<TransactionContext> {
public abstract class TransactionContextBuilder
extends DistributedPrimitiveBuilder<TransactionContextBuilder, TransactionContext> {
public TransactionContextBuilder() {
super(DistributedPrimitive.Type.TRANSACTION_CONTEXT);
......
......@@ -29,7 +29,6 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.primitives.ConsistentMapBackedJavaMap;
import com.google.common.base.Objects;
......@@ -293,53 +292,11 @@ public final class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
return new Builder();
}
public static class Builder<K, V> implements ConsistentMapBuilder<K, V> {
String mapName = "map";
@Override
public ConsistentMapBuilder<K, V> withName(String mapName) {
this.mapName = mapName;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) {
return this;
}
@Override
public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
return this;
}
@Override
public ConsistentMapBuilder<K, V> withPartitionsDisabled() {
return this;
}
@Override
public ConsistentMapBuilder<K, V> withUpdatesDisabled() {
return this;
}
@Override
public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
return this;
}
@Override
public ConsistentMapBuilder<K, V> withRelaxedReadConsistency() {
return this;
}
@Override
public ConsistentMapBuilder<K, V> withMeteringDisabled() {
return this;
}
public static class Builder<K, V> extends ConsistentMapBuilder<K, V> {
@Override
public ConsistentMap<K, V> build() {
return new TestConsistentMap<>(mapName);
return new TestConsistentMap<>(name());
}
@Override
......
......@@ -237,7 +237,8 @@ public class ECDeviceStore
.withSerializer(Serializer.using(KryoNamespaces.API))
.withPartitionsDisabled()
.withRelaxedReadConsistency()
.build();
.build()
.asDistributedSet();
deviceDescriptions.addListener(deviceUpdateListener);
portDescriptions.addListener(portUpdateListener);
......
......@@ -15,13 +15,10 @@
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
......@@ -30,85 +27,25 @@ import static com.google.common.base.Preconditions.checkState;
* @param <K> type for map key
* @param <V> type for map value
*/
public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K, V> {
public class DefaultConsistentMapBuilder<K, V> extends ConsistentMapBuilder<K, V> {
private Serializer serializer;
private String name;
private ApplicationId applicationId;
private boolean purgeOnUninstall = false;
private boolean partitionsEnabled = true;
private boolean readOnly = false;
private boolean metering = true;
private boolean relaxedReadConsistency = false;
private final DatabaseManager manager;
private static final long DEFAULT_OPERATION_TIMEOUT_MILLIS = 5000L;
public DefaultConsistentMapBuilder(DatabaseManager manager) {
this.manager = manager;
}
@Override
public ConsistentMapBuilder<K, V> withName(String name) {
checkArgument(name != null && !name.isEmpty());
this.name = name;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) {
checkArgument(id != null);
this.applicationId = id;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
purgeOnUninstall = true;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withMeteringDisabled() {
metering = false;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withPartitionsDisabled() {
partitionsEnabled = false;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withUpdatesDisabled() {
readOnly = true;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withRelaxedReadConsistency() {
relaxedReadConsistency = true;
return this;
}
private void validateInputs() {
checkState(name != null, "name must be specified");
checkState(serializer != null, "serializer must be specified");
if (purgeOnUninstall) {
checkState(applicationId != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
checkState(name() != null, "name must be specified");
checkState(serializer() != null, "serializer must be specified");
if (purgeOnUninstall()) {
checkState(applicationId() != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
}
}
@Override
public ConsistentMap<K, V> build() {
return buildAndRegisterMap().asConsistentMap(DEFAULT_OPERATION_TIMEOUT_MILLIS);
return buildAndRegisterMap().asConsistentMap();
}
@Override
......@@ -118,25 +55,25 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
validateInputs();
Database database = partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase;
if (relaxedReadConsistency) {
Database database = partitionsDisabled() ? manager.inMemoryDatabase : manager.partitionedDatabase;
if (relaxedReadConsistency()) {
return manager.registerMap(
new AsyncCachingConsistentMap<>(name,
applicationId,
new AsyncCachingConsistentMap<>(name(),
applicationId(),
database,
serializer,
readOnly,
purgeOnUninstall,
metering));
serializer(),
readOnly(),
purgeOnUninstall(),
meteringEnabled()));
} else {
return manager.registerMap(
new DefaultAsyncConsistentMap<>(name,
applicationId,
new DefaultAsyncConsistentMap<>(name(),
applicationId(),
database,
serializer,
readOnly,
purgeOnUninstall,
metering));
serializer(),
readOnly(),
purgeOnUninstall(),
meteringEnabled()));
}
}
}
\ No newline at end of file
......
......@@ -20,7 +20,6 @@ import java.util.function.Supplier;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.DistributedSetBuilder;
......@@ -29,7 +28,7 @@ import org.onosproject.store.service.DistributedSetBuilder;
*
* @param <E> type for set elements
*/
public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
public class DefaultDistributedSetBuilder<E> extends DistributedSetBuilder<E> {
private String name;
private ConsistentMapBuilder<E, Boolean> mapBuilder;
......@@ -90,12 +89,7 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E>
}
@Override
public DistributedSet<E> build() {
return new DefaultDistributedSet<E>(buildAsyncSet());
}
@Override
public AsyncDistributedSet<E> buildAsyncSet() {
public AsyncDistributedSet<E> build() {
return new DefaultAsyncDistributedSet<E>(mapBuilder.buildAsyncMap(), name, metering);
}
}
......
......@@ -82,14 +82,16 @@ public class DefaultTransactionContext implements TransactionContext {
checkState(isOpen, TX_NOT_OPEN_ERROR);
checkNotNull(mapName);
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
mapBuilderSupplier.get()
return txMaps.computeIfAbsent(mapName, name -> {
ConsistentMapBuilder mapBuilder = (ConsistentMapBuilder) mapBuilderSupplier.get()
.withName(name)
.withSerializer(serializer)
.buildAsyncMap(),
.withSerializer(serializer);
return new DefaultTransactionalMap<>(
name,
mapBuilder.buildAsyncMap(),
this,
serializer));
serializer);
});
}
@SuppressWarnings("unchecked")
......
......@@ -48,7 +48,7 @@ public class DefaultTransactionContextBuilder extends TransactionContextBuilder
return new DefaultTransactionContext(transactionId, transactionCommitter, () -> {
ConsistentMapBuilder mapBuilder = mapBuilderSupplier.get();
if (partitionsDisabled()) {
mapBuilder = mapBuilder.withPartitionsDisabled();
mapBuilder = (ConsistentMapBuilder) mapBuilder.withPartitionsDisabled();
}
return mapBuilder;
});
......