Madan Jampani
Committed by Gerrit Code Review

Adds abstract distributed primitive builder + Refactored AtomicCounter and Atomi…

…cValue builder to make use of it.

Change-Id: I56cef62673fabc54ca29634c27e4ff1f41ba6a88
Showing 17 changed files with 278 additions and 244 deletions
......@@ -67,11 +67,11 @@ public class CounterTestIncrementCommand extends AbstractShellCommand {
atomicCounter = storageService.atomicCounterBuilder()
.withName(counter)
.withPartitionsDisabled()
.buildAsyncCounter();
.build();
} else {
atomicCounter = storageService.atomicCounterBuilder()
.withName(counter)
.buildAsyncCounter();
.build();
}
CompletableFuture<Long> result;
if (delta != null) {
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
package org.onosproject.store.primitives;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
......@@ -26,20 +26,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Default implementation for a distributed AtomicCounter backed by
* partitioned Raft DB.
* <p>
* The initial value will be zero.
* Default implementation for a {@code AtomicCounter} backed by a {@link AsyncAtomicCounter}.
*/
public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implements AtomicCounter {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicCounter asyncCounter;
private final long operationTimeoutMillis;
public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter) {
public DefaultAtomicCounter(AsyncAtomicCounter asyncCounter, long operationTimeoutMillis) {
super(asyncCounter);
this.asyncCounter = asyncCounter;
this.operationTimeoutMillis = operationTimeoutMillis;
}
@Override
......@@ -77,9 +74,9 @@ public class DefaultAtomicCounter extends Synchronous<AsyncAtomicCounter> implem
return complete(asyncCounter.get());
}
private static <T> T complete(CompletableFuture<T> future) {
private <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageException.Interrupted();
......
......@@ -13,10 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
package org.onosproject.store.primitives;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValue;
......@@ -24,21 +26,20 @@ import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Synchronous;
import com.google.common.util.concurrent.Futures;
/**
* Default implementation of {@link AtomicValue}.
* Default implementation for a {@code AtomicValue} backed by a {@link AsyncAtomicValue}.
*
* @param <V> value type
*/
public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> implements AtomicValue<V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncAtomicValue<V> asyncValue;
private final long operationTimeoutMillis;
public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue, long operationTimeoutMillis) {
super(asyncValue);
this.asyncValue = asyncValue;
this.operationTimeoutMillis = operationTimeoutMillis;
}
@Override
......@@ -71,7 +72,16 @@ public class DefaultAtomicValue<V> extends Synchronous<AsyncAtomicValue<V>> impl
complete(asyncValue.removeListener(listener));
}
private static <V> V complete(CompletableFuture<V> future) {
return Futures.getChecked(future, StorageException.class, OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
private <T> T complete(CompletableFuture<T> future) {
try {
return future.get(operationTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageException.Interrupted();
} catch (TimeoutException e) {
throw new StorageException.Timeout();
} catch (ExecutionException e) {
throw new StorageException(e.getCause());
}
}
}
\ No newline at end of file
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.Serializer;
/**
* Abstract builder for distributed primitives.
*
* @param <T> distributed primitive type
*/
public abstract class DistributedPrimitiveBuilder<T extends DistributedPrimitive> {
private DistributedPrimitive.Type type;
private String name;
private ApplicationId applicationId;
private Serializer serializer;
private boolean partitionsDisabled = false;
private boolean meteringDisabled = false;
public DistributedPrimitiveBuilder(DistributedPrimitive.Type type) {
this.type = type;
}
/**
* Sets the primitive name.
*
* @param name primitive name
* @return this builder
*/
public DistributedPrimitiveBuilder<T> withName(String name) {
this.name = name;
return this;
}
/**
* Sets the serializer to use for transcoding info held in the primitive.
*
* @param serializer serializer
* @return this builder
*/
public DistributedPrimitiveBuilder<T> withSerializer(Serializer serializer) {
this.serializer = serializer;
return this;
}
/**
* Sets the application id that owns this primitive.
*
* @param applicationId application identifier
* @return this builder
*/
public DistributedPrimitiveBuilder<T> withApplicationId(ApplicationId applicationId) {
this.applicationId = applicationId;
return this;
}
/**
* Creates this primitive on a special partition that comprises of all members in the cluster.
* @deprecated usage of this method is discouraged for most common scenarios. Eventually it will be replaced
* with a better alternative that does not exposes low level details. Until then avoid using this method.
* @return this builder
*/
@Deprecated
public DistributedPrimitiveBuilder<T> withPartitionsDisabled() {
this.partitionsDisabled = true;
return this;
}
/**
* Disables recording usage stats for this primitive.
* @deprecated usage of this method is discouraged for most common scenarios.
* @return this builder
*/
@Deprecated
public DistributedPrimitiveBuilder<T> withMeteringDisabled() {
this.meteringDisabled = true;
return this;
}
/**
* Returns if metering is enabled.
*
* @return {@code true} if yes; {@code false} otherwise
*/
public final boolean meteringEnabled() {
return !meteringDisabled;
}
/**
* Returns if partitions are disabled.
*
* @return {@code true} if yes; {@code false} otherwise
*/
public final boolean partitionsDisabled() {
return partitionsDisabled;
}
/**
* Returns the serializer.
*
* @return serializer
*/
public final Serializer serializer() {
return serializer;
}
/**
* Returns the application identifier.
*
* @return application id
*/
public final ApplicationId applicationId() {
return applicationId;
}
/**
* Returns the name of the primitive.
*
* @return primitive name
*/
public final String name() {
return name;
}
/**
* Returns the primitive type.
*
* @return primitive type
*/
public final DistributedPrimitive.Type type() {
return type;
}
/**
* Constructs an instance of the distributed primitive.
* @return distributed primitive
*/
public abstract T build();
}
......@@ -17,6 +17,8 @@ package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.primitives.DefaultAtomicCounter;
/**
* An async atomic counter dispenses monotonically increasing values.
*/
......@@ -81,4 +83,23 @@ public interface AsyncAtomicCounter extends DistributedPrimitive {
* @return true if the update occurred and the expected value was equal to the current value, false otherwise
*/
CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue);
/**
* Returns a new {@link AtomicCounter} that is backed by this instance.
*
* @param timeoutMillis timeout duration for the returned ConsistentMap operations
* @return new {@code ConsistentMap} instance
*/
default AtomicCounter asAtomicCounter(long timeoutMillis) {
return new DefaultAtomicCounter(this, timeoutMillis);
}
/**
* Returns a new {@link AtomicCounter} that is backed by this instance and with a default operation timeout.
*
* @return new {@code ConsistentMap} instance
*/
default AtomicCounter asAtomicCounter() {
return new DefaultAtomicCounter(this, DEFAULT_OPERTATION_TIMEOUT_MILLIS);
}
}
......
......@@ -17,6 +17,8 @@ package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.primitives.DefaultAtomicValue;
/**
* Distributed version of java.util.concurrent.atomic.AtomicReference.
* <p>
......@@ -80,4 +82,23 @@ public interface AsyncAtomicValue<V> extends DistributedPrimitive {
* @return CompletableFuture that will be completed when the operation finishes
*/
CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener);
/**
* Returns a new {@link AtomicValue} that is backed by this instance.
*
* @param timeoutMillis timeout duration for the returned ConsistentMap operations
* @return new {@code AtomicValue} instance
*/
default AtomicValue<V> asAtomicValue(long timeoutMillis) {
return new DefaultAtomicValue<>(this, timeoutMillis);
}
/**
* Returns a new {@link AtomicValue} that is backed by this instance and with a default operation timeout.
*
* @return new {@code AtomicValue} instance
*/
default AtomicValue<V> asAtomicValue() {
return new DefaultAtomicValue<>(this, DEFAULT_OPERTATION_TIMEOUT_MILLIS);
}
}
......
......@@ -15,61 +15,13 @@
*/
package org.onosproject.store.service;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for AtomicCounter.
*/
public interface AtomicCounterBuilder {
/**
* Sets the name for the atomic counter.
* <p>
* Each atomic counter is identified by a unique name.
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the atomic counter
* @return this AtomicCounterBuilder
*/
AtomicCounterBuilder withName(String name);
/**
* Creates this counter on the partition that spans the entire cluster.
* <p>
* When partitioning is disabled, the counter state will be
* ephemeral and does not survive a full cluster restart.
* </p>
* <p>
* Note: By default partitions are enabled.
* </p>
* @return this AtomicCounterBuilder
*/
AtomicCounterBuilder withPartitionsDisabled();
/**
* Instantiates Metering service to gather usage and performance metrics.
* By default, usage data will be stored.
*
* @return this AtomicCounterBuilder
*/
AtomicCounterBuilder withMeteringDisabled();
/**
* Builds a AtomicCounter based on the configuration options
* supplied to this builder.
*
* @return new AtomicCounter
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AtomicCounter build();
/**
* Builds a AsyncAtomicCounter based on the configuration options
* supplied to this builder.
*
* @return new AsyncAtomicCounter
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AsyncAtomicCounter buildAsyncCounter();
public abstract class AtomicCounterBuilder extends DistributedPrimitiveBuilder<AsyncAtomicCounter> {
public AtomicCounterBuilder() {
super(DistributedPrimitive.Type.COUNTER);
}
}
\ No newline at end of file
......
......@@ -15,65 +15,16 @@
*/
package org.onosproject.store.service;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for constructing new AtomicValue instances.
*
* @param <V> atomic value type
*/
public interface AtomicValueBuilder<V> {
/**
* Sets the name for the atomic value.
* <p>
* Each atomic value is identified by a unique name.
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the atomic value
* @return this AtomicValueBuilder for method chaining
*/
AtomicValueBuilder<V> withName(String name);
public abstract class AtomicValueBuilder<V> extends DistributedPrimitiveBuilder<AsyncAtomicValue<V>> {
/**
* Sets a serializer that can be used to serialize the value.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializer serializer
* @return this AtomicValueBuilder for method chaining
*/
AtomicValueBuilder<V> withSerializer(Serializer serializer);
/**
* Creates this atomic value on the partition that spans the entire cluster.
* <p>
* When partitioning is disabled, the value state will be
* ephemeral and does not survive a full cluster restart.
* </p>
* <p>
* Note: By default partitions are enabled.
* </p>
* @return this AtomicValueBuilder for method chaining
*/
AtomicValueBuilder<V> withPartitionsDisabled();
/**
* Builds a AsyncAtomicValue based on the configuration options
* supplied to this builder.
*
* @return new AsyncAtomicValue
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AsyncAtomicValue<V> buildAsyncValue();
/**
* Builds a AtomicValue based on the configuration options
* supplied to this builder.
*
* @return new AtomicValue
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AtomicValue<V> build();
public AtomicValueBuilder() {
super(DistributedPrimitive.Type.VALUE);
}
}
......
......@@ -64,6 +64,8 @@ public interface DistributedPrimitive {
LEADER_ELECTOR
}
static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 5000L;
/**
* Returns the name of this primitive.
* @return name
......
......@@ -15,12 +15,13 @@
*/
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
/**
* Test implementation of atomic counter.
*/
public final class TestAtomicCounter implements AtomicCounter {
public final class TestAtomicCounter implements AsyncAtomicCounter {
final AtomicLong value;
@Override
......@@ -28,77 +29,53 @@ public final class TestAtomicCounter implements AtomicCounter {
return null;
}
@Override
public Type type() {
return Type.COUNTER;
}
private TestAtomicCounter() {
value = new AtomicLong();
}
@Override
public long incrementAndGet() {
return value.incrementAndGet();
public CompletableFuture<Long> incrementAndGet() {
return CompletableFuture.completedFuture(value.incrementAndGet());
}
@Override
public long getAndIncrement() {
return value.getAndIncrement();
public CompletableFuture<Long> getAndIncrement() {
return CompletableFuture.completedFuture(value.getAndIncrement());
}
@Override
public long getAndAdd(long delta) {
return value.getAndAdd(delta);
public CompletableFuture<Long> getAndAdd(long delta) {
return CompletableFuture.completedFuture(value.getAndAdd(delta));
}
@Override
public long addAndGet(long delta) {
return value.addAndGet(delta);
public CompletableFuture<Long> addAndGet(long delta) {
return CompletableFuture.completedFuture(value.addAndGet(delta));
}
@Override
public void set(long value) {
public CompletableFuture<Void> set(long value) {
this.value.set(value);
return CompletableFuture.completedFuture(null);
}
@Override
public boolean compareAndSet(long expectedValue, long updateValue) {
return value.compareAndSet(expectedValue, updateValue);
public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
return CompletableFuture.completedFuture(value.compareAndSet(expectedValue, updateValue));
}
@Override
public long get() {
return value.get();
public CompletableFuture<Long> get() {
return CompletableFuture.completedFuture(value.get());
}
public static AtomicCounterBuilder builder() {
return new Builder();
}
public static class Builder implements AtomicCounterBuilder {
@Override
public AtomicCounterBuilder withName(String name) {
return this;
}
@Override
public AtomicCounterBuilder withPartitionsDisabled() {
return this;
}
@Override
public AtomicCounterBuilder withMeteringDisabled() {
return this;
}
@Override
public AsyncAtomicCounter buildAsyncCounter() {
throw new UnsupportedOperationException("Async Counter is not supported");
}
public static class Builder extends AtomicCounterBuilder {
@Override
public AtomicCounter build() {
public AsyncAtomicCounter build() {
return new TestAtomicCounter();
}
}
......
......@@ -69,7 +69,8 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
public void activate() {
appIdCounter = storageService.atomicCounterBuilder()
.withName("onos-app-id-counter")
.build();
.build()
.asAtomicCounter();
registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
.withName("onos-app-ids")
......
......@@ -69,7 +69,8 @@ public class ConsistentIdBlockStore implements IdBlockStore {
.computeIfAbsent(topic,
name -> storageService.atomicCounterBuilder()
.withName(name)
.build());
.build()
.asAtomicCounter());
Long blockBase = Tools.retryable(counter::getAndAdd,
StorageException.class,
MAX_TRIES,
......
......@@ -50,7 +50,8 @@ public class LogicalClockManager implements LogicalClockService {
atomicCounter = storageService.atomicCounterBuilder()
.withName(SYSTEM_LOGICAL_CLOCK_COUNTER_NAME)
.withPartitionsDisabled()
.build();
.build()
.asAtomicCounter();
log.info("Started");
}
......
......@@ -68,7 +68,8 @@ public class DistributedFlowObjectiveStore
nextIds = storageService.atomicCounterBuilder()
.withName("next-objective-counter")
.build();
.build()
.asAtomicCounter();
log.info("Started");
}
......
......@@ -16,22 +16,17 @@
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.AtomicCounterBuilder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Default implementation of AtomicCounterBuilder.
*/
public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
public class DefaultAtomicCounterBuilder extends AtomicCounterBuilder {
private String name;
private boolean partitionsEnabled = true;
private final Database partitionedDatabase;
private final Database inMemoryDatabase;
private boolean metering = true;
public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
this.inMemoryDatabase = inMemoryDatabase;
......@@ -39,37 +34,8 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
}
@Override
public AtomicCounterBuilder withName(String name) {
checkArgument(name != null && !name.isEmpty());
this.name = name;
return this;
}
@Override
public AtomicCounterBuilder withPartitionsDisabled() {
partitionsEnabled = false;
return this;
}
@Override
public AtomicCounter build() {
return new DefaultAtomicCounter(buildAsyncCounter());
}
@Override
public AsyncAtomicCounter buildAsyncCounter() {
validateInputs();
Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
return new DefaultAsyncAtomicCounter(name, database, metering);
}
@Override
public AtomicCounterBuilder withMeteringDisabled() {
metering = false;
return this;
}
private void validateInputs() {
checkState(name != null, "name must be specified");
public AsyncAtomicCounter build() {
Database database = partitionsDisabled() ? inMemoryDatabase : partitionedDatabase;
return new DefaultAsyncAtomicCounter(checkNotNull(name()), database, meteringEnabled());
}
}
......
......@@ -18,20 +18,18 @@ package org.onosproject.store.primitives.impl;
import java.util.function.Supplier;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Default implementation of AtomicValueBuilder.
*
* @param <V> value type
*/
public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
public class DefaultAtomicValueBuilder<V> extends AtomicValueBuilder<V> {
private String name;
private Serializer serializer;
private ConsistentMapBuilder<String, byte[]> mapBuilder;
public DefaultAtomicValueBuilder(Supplier<ConsistentMapBuilder<String, byte[]>> mapBuilderSupplier) {
......@@ -39,30 +37,9 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
}
@Override
public AtomicValueBuilder<V> withName(String name) {
this.name = name;
return this;
}
@Override
public AtomicValueBuilder<V> withSerializer(Serializer serializer) {
mapBuilder.withSerializer(serializer);
return this;
}
@Override
public AtomicValueBuilder<V> withPartitionsDisabled() {
mapBuilder.withPartitionsDisabled();
return this;
}
@Override
public AsyncAtomicValue<V> buildAsyncValue() {
return new DefaultAsyncAtomicValue<>(name, serializer, mapBuilder.buildAsyncMap());
}
@Override
public AtomicValue<V> build() {
return new DefaultAtomicValue<>(buildAsyncValue());
public AsyncAtomicValue<V> build() {
return new DefaultAsyncAtomicValue<>(checkNotNull(name()),
checkNotNull(serializer()),
mapBuilder.buildAsyncMap());
}
}
......
......@@ -183,7 +183,8 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
private AtomicCounter allocateCounter(DeviceId deviceId) {
return storageService.atomicCounterBuilder()
.withName(String.format(METERCOUNTERIDENTIFIER, deviceId))
.build();
.build()
.asAtomicCounter();
}
private class InternalMeterProviderService
......