Madan Jampani

Added an async version for AtomicValue and misc javadoc improvements

Change-Id: Idc401964a726d221c01ecda0cc42c4a92551113f
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.service;
17 +
18 +import java.util.concurrent.CompletableFuture;
19 +
20 +/**
21 + * Distributed version of java.util.concurrent.atomic.AtomicReference.
22 + * <p>
23 + * All methods of this interface return a {@link CompletableFuture future} immediately
24 + * after a successful invocation. The operation itself is executed asynchronous and
25 + * the returned future will be {@link CompletableFuture#complete completed} when the
26 + * operation finishes.
27 + *
28 + * @param <V> value type
29 + */
30 +public interface AsyncAtomicValue<V> {
31 +
32 + /**
33 + * Atomically sets the value to the given updated value if the current value is equal to the expected value.
34 + * <p>
35 + * IMPORTANT: Equality is based on the equality of the serialized {code byte[]} representations.
36 + * <p>
37 + * @param expect the expected value
38 + * @param update the new value
39 + * @return CompletableFuture that will be completed with {@code true} if update was successful. Otherwise future
40 + * will be completed with a value of {@code false}
41 + */
42 + CompletableFuture<Boolean> compareAndSet(V expect, V update);
43 +
44 + /**
45 + * Gets the current value.
46 + * @return CompletableFuture that will be completed with the value
47 + */
48 + CompletableFuture<V> get();
49 +
50 + /**
51 + * Atomically sets to the given value and returns the old value.
52 + * @param value the new value
53 + * @return CompletableFuture that will be completed with the previous value
54 + */
55 + CompletableFuture<V> getAndSet(V value);
56 +
57 + /**
58 + * Sets to the given value.
59 + * @param value value to set
60 + * @return CompletableFuture that will be completed when the operation finishes
61 + */
62 + CompletableFuture<Void> set(V value);
63 +
64 + /**
65 + * Registers the specified listener to be notified whenever the atomic value is updated.
66 + * @param listener listener to notify about events
67 + * @return CompletableFuture that will be completed when the operation finishes
68 + */
69 + CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener);
70 +
71 + /**
72 + * Unregisters the specified listener such that it will no longer
73 + * receive atomic value update notifications.
74 + * @param listener listener to unregister
75 + * @return CompletableFuture that will be completed when the operation finishes
76 + */
77 + CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener);
78 +}
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 +import java.util.Objects;
20 import java.util.Map.Entry; 21 import java.util.Map.Entry;
21 import java.util.Set; 22 import java.util.Set;
22 import java.util.concurrent.CompletableFuture; 23 import java.util.concurrent.CompletableFuture;
...@@ -42,7 +43,11 @@ import java.util.function.Predicate; ...@@ -42,7 +43,11 @@ import java.util.function.Predicate;
42 * </p><p> 43 * </p><p>
43 * This map does not allow null values. All methods can throw a ConsistentMapException 44 * This map does not allow null values. All methods can throw a ConsistentMapException
44 * (which extends RuntimeException) to indicate failures. 45 * (which extends RuntimeException) to indicate failures.
45 - * 46 + * <p>
47 + * All methods of this interface return a {@link CompletableFuture future} immediately
48 + * after a successful invocation. The operation itself is executed asynchronous and
49 + * the returned future will be {@link CompletableFuture#complete completed} when the
50 + * operation finishes.
46 */ 51 */
47 public interface AsyncConsistentMap<K, V> { 52 public interface AsyncConsistentMap<K, V> {
48 53
...@@ -58,7 +63,9 @@ public interface AsyncConsistentMap<K, V> { ...@@ -58,7 +63,9 @@ public interface AsyncConsistentMap<K, V> {
58 * 63 *
59 * @return a future whose value will be true if map has no entries, false otherwise. 64 * @return a future whose value will be true if map has no entries, false otherwise.
60 */ 65 */
61 - CompletableFuture<Boolean> isEmpty(); 66 + default CompletableFuture<Boolean> isEmpty() {
67 + return size().thenApply(s -> s == 0);
68 + }
62 69
63 /** 70 /**
64 * Returns true if this map contains a mapping for the specified key. 71 * Returns true if this map contains a mapping for the specified key.
...@@ -97,8 +104,10 @@ public interface AsyncConsistentMap<K, V> { ...@@ -97,8 +104,10 @@ public interface AsyncConsistentMap<K, V> {
97 * @return the current (existing or computed) value associated with the specified key, 104 * @return the current (existing or computed) value associated with the specified key,
98 * or null if the computed value is null 105 * or null if the computed value is null
99 */ 106 */
100 - CompletableFuture<Versioned<V>> computeIfAbsent(K key, 107 + default CompletableFuture<Versioned<V>> computeIfAbsent(K key,
101 - Function<? super K, ? extends V> mappingFunction); 108 + Function<? super K, ? extends V> mappingFunction) {
109 + return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
110 + }
102 111
103 /** 112 /**
104 * If the value for the specified key is present and non-null, attempts to compute a new 113 * If the value for the specified key is present and non-null, attempts to compute a new
...@@ -110,8 +119,10 @@ public interface AsyncConsistentMap<K, V> { ...@@ -110,8 +119,10 @@ public interface AsyncConsistentMap<K, V> {
110 * @param remappingFunction the function to compute a value 119 * @param remappingFunction the function to compute a value
111 * @return the new value associated with the specified key, or null if computed value is null 120 * @return the new value associated with the specified key, or null if computed value is null
112 */ 121 */
113 - CompletableFuture<Versioned<V>> computeIfPresent(K key, 122 + default CompletableFuture<Versioned<V>> computeIfPresent(K key,
114 - BiFunction<? super K, ? super V, ? extends V> remappingFunction); 123 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
124 + return computeIf(key, Objects::nonNull, remappingFunction);
125 + }
115 126
116 /** 127 /**
117 * Attempts to compute a mapping for the specified key and its current mapped value (or 128 * Attempts to compute a mapping for the specified key and its current mapped value (or
...@@ -123,8 +134,10 @@ public interface AsyncConsistentMap<K, V> { ...@@ -123,8 +134,10 @@ public interface AsyncConsistentMap<K, V> {
123 * @param remappingFunction the function to compute a value 134 * @param remappingFunction the function to compute a value
124 * @return the new value associated with the specified key, or null if computed value is null 135 * @return the new value associated with the specified key, or null if computed value is null
125 */ 136 */
126 - CompletableFuture<Versioned<V>> compute(K key, 137 + default CompletableFuture<Versioned<V>> compute(K key,
127 - BiFunction<? super K, ? super V, ? extends V> remappingFunction); 138 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
139 + return computeIf(key, v -> true, remappingFunction);
140 + }
128 141
129 /** 142 /**
130 * If the value for the specified key satisfies a condition, attempts to compute a new 143 * If the value for the specified key satisfies a condition, attempts to compute a new
...@@ -280,14 +293,16 @@ public interface AsyncConsistentMap<K, V> { ...@@ -280,14 +293,16 @@ public interface AsyncConsistentMap<K, V> {
280 * Registers the specified listener to be notified whenever the map is updated. 293 * Registers the specified listener to be notified whenever the map is updated.
281 * 294 *
282 * @param listener listener to notify about map events 295 * @param listener listener to notify about map events
296 + * @return future that will be completed when the operation finishes
283 */ 297 */
284 - void addListener(MapEventListener<K, V> listener); 298 + CompletableFuture<Void> addListener(MapEventListener<K, V> listener);
285 299
286 /** 300 /**
287 * Unregisters the specified listener such that it will no longer 301 * Unregisters the specified listener such that it will no longer
288 * receive map change notifications. 302 * receive map change notifications.
289 * 303 *
290 * @param listener listener to unregister 304 * @param listener listener to unregister
305 + * @return future that will be completed when the operation finishes
291 */ 306 */
292 - void removeListener(MapEventListener<K, V> listener); 307 + CompletableFuture<Void> removeListener(MapEventListener<K, V> listener);
293 } 308 }
......
...@@ -68,6 +68,15 @@ public interface AtomicValueBuilder<V> { ...@@ -68,6 +68,15 @@ public interface AtomicValueBuilder<V> {
68 AtomicValueBuilder<V> withMeteringDisabled(); 68 AtomicValueBuilder<V> withMeteringDisabled();
69 69
70 /** 70 /**
71 + * Builds a AsyncAtomicValue based on the configuration options
72 + * supplied to this builder.
73 + *
74 + * @return new AsyncAtomicValue
75 + * @throws java.lang.RuntimeException if a mandatory parameter is missing
76 + */
77 + AsyncAtomicValue<V> buildAsyncValue();
78 +
79 + /**
71 * Builds a AtomicValue based on the configuration options 80 * Builds a AtomicValue based on the configuration options
72 * supplied to this builder. 81 * supplied to this builder.
73 * 82 *
......
...@@ -298,4 +298,4 @@ public interface ConsistentMap<K, V> { ...@@ -298,4 +298,4 @@ public interface ConsistentMap<K, V> {
298 * @return java.util.Map 298 * @return java.util.Map
299 */ 299 */
300 Map<K, V> asJavaMap(); 300 Map<K, V> asJavaMap();
301 -}
...\ No newline at end of file ...\ No newline at end of file
301 +}
......
...@@ -18,7 +18,7 @@ package org.onosproject.store.service; ...@@ -18,7 +18,7 @@ package org.onosproject.store.service;
18 import org.onosproject.core.ApplicationId; 18 import org.onosproject.core.ApplicationId;
19 19
20 /** 20 /**
21 - * Builder for consistent maps. 21 + * Builder for {@link ConsistentMap} instances.
22 * 22 *
23 * @param <K> type for map key 23 * @param <K> type for map key
24 * @param <V> type for map value 24 * @param <V> type for map value
...@@ -28,19 +28,20 @@ public interface ConsistentMapBuilder<K, V> { ...@@ -28,19 +28,20 @@ public interface ConsistentMapBuilder<K, V> {
28 /** 28 /**
29 * Sets the name of the map. 29 * Sets the name of the map.
30 * <p> 30 * <p>
31 - * Each consistent map is identified by a unique map name. 31 + * Each map is identified by a unique map name. Different instances with the same name are all backed by the
32 + * same backend state.
32 * </p> 33 * </p>
33 * <p> 34 * <p>
34 - * Note: This is a mandatory parameter. 35 + * <b>Note:</b> This is a mandatory parameter.
35 * </p> 36 * </p>
36 * 37 *
37 - * @param name name of the consistent map 38 + * @param name name of the map
38 * @return this ConsistentMapBuilder 39 * @return this ConsistentMapBuilder
39 */ 40 */
40 ConsistentMapBuilder<K, V> withName(String name); 41 ConsistentMapBuilder<K, V> withName(String name);
41 42
42 /** 43 /**
43 - * Sets the owner applicationId for the map. 44 + * Sets the identifier of the application that owns this map instance.
44 * <p> 45 * <p>
45 * Note: If {@code purgeOnUninstall} option is enabled, applicationId 46 * Note: If {@code purgeOnUninstall} option is enabled, applicationId
46 * must be specified. 47 * must be specified.
......
...@@ -111,6 +111,16 @@ public class Versioned<V> { ...@@ -111,6 +111,16 @@ public class Versioned<V> {
111 return versioned == null ? defaultValue : versioned.value(); 111 return versioned == null ? defaultValue : versioned.value();
112 } 112 }
113 113
114 + /**
115 + * Returns the value of the specified Versioned object if non-null or else returns null.
116 + * @param versioned versioned object
117 + * @param <U> type of the versioned value
118 + * @return versioned value or null if versioned object is null
119 + */
120 + public static <U> U valueOrNull(Versioned<U> versioned) {
121 + return valueOrElse(versioned, null);
122 + }
123 +
114 @Override 124 @Override
115 public int hashCode() { 125 public int hashCode() {
116 return Objects.hashCode(value, version, creationTime); 126 return Objects.hashCode(value, version, creationTime);
......
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.consistent.impl;
17 +
18 +import org.onosproject.store.service.AsyncAtomicValue;
19 +import org.onosproject.store.service.AsyncConsistentMap;
20 +import org.onosproject.store.service.AtomicValueEvent;
21 +import org.onosproject.store.service.AtomicValueEventListener;
22 +import org.onosproject.store.service.MapEvent;
23 +import org.onosproject.store.service.MapEventListener;
24 +import org.onosproject.store.service.Versioned;
25 +
26 +import java.util.Set;
27 +import java.util.concurrent.CompletableFuture;
28 +import java.util.concurrent.CopyOnWriteArraySet;
29 +
30 +/**
31 + * Default implementation of {@link AsyncAtomicValue}.
32 + *
33 + * @param <V> value type
34 + */
35 +public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
36 +
37 + private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
38 + private final AsyncConsistentMap<String, V> valueMap;
39 + private final String name;
40 + private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener();
41 + private final MeteringAgent monitor;
42 +
43 + private static final String COMPONENT_NAME = "atomicValue";
44 + private static final String GET = "get";
45 + private static final String GET_AND_SET = "getAndSet";
46 + private static final String SET = "set";
47 + private static final String COMPARE_AND_SET = "compareAndSet";
48 +
49 + public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap,
50 + String name,
51 + boolean meteringEnabled) {
52 + this.valueMap = valueMap;
53 + this.name = name;
54 + this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
55 + }
56 +
57 + @Override
58 + public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
59 + final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
60 + CompletableFuture<Boolean> response;
61 + if (expect == null) {
62 + if (update == null) {
63 + response = CompletableFuture.completedFuture(true);
64 + }
65 + response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
66 + } else {
67 + response = update == null
68 + ? valueMap.remove(name, expect)
69 + : valueMap.replace(name, expect, update);
70 + }
71 + return response.whenComplete((r, e) -> newTimer.stop(null));
72 + }
73 +
74 + @Override
75 + public CompletableFuture<V> get() {
76 + final MeteringAgent.Context newTimer = monitor.startTimer(GET);
77 + return valueMap.get(name)
78 + .thenApply(Versioned::valueOrNull)
79 + .whenComplete((r, e) -> newTimer.stop(null));
80 + }
81 +
82 + @Override
83 + public CompletableFuture<V> getAndSet(V value) {
84 + final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
85 + CompletableFuture<Versioned<V>> previousValue = value == null ?
86 + valueMap.remove(name) : valueMap.put(name, value);
87 + return previousValue.thenApply(Versioned::valueOrNull)
88 + .whenComplete((r, e) -> newTimer.stop(null));
89 + }
90 +
91 + @Override
92 + public CompletableFuture<Void> set(V value) {
93 + final MeteringAgent.Context newTimer = monitor.startTimer(SET);
94 + CompletableFuture<Void> previousValue = value == null ?
95 + valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null);
96 + return previousValue.whenComplete((r, e) -> newTimer.stop(null));
97 + }
98 +
99 + @Override
100 + public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
101 + synchronized (listeners) {
102 + if (listeners.add(listener)) {
103 + if (listeners.size() == 1) {
104 + return valueMap.addListener(mapEventListener);
105 + }
106 + }
107 + }
108 + return CompletableFuture.completedFuture(null);
109 + }
110 +
111 + @Override
112 + public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
113 + synchronized (listeners) {
114 + if (listeners.remove(listener)) {
115 + if (listeners.size() == 0) {
116 + return valueMap.removeListener(mapEventListener);
117 + }
118 + }
119 + }
120 + return CompletableFuture.completedFuture(null);
121 + }
122 +
123 + private class InternalMapEventListener implements MapEventListener<String, V> {
124 +
125 + @Override
126 + public void event(MapEvent<String, V> mapEvent) {
127 + V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value();
128 + AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
129 + listeners.forEach(l -> l.event(atomicValueEvent));
130 + }
131 + }
132 +}
...@@ -477,13 +477,15 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V ...@@ -477,13 +477,15 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
477 } 477 }
478 478
479 @Override 479 @Override
480 - public void addListener(MapEventListener<K, V> listener) { 480 + public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
481 listeners.add(listener); 481 listeners.add(listener);
482 + return CompletableFuture.completedFuture(null);
482 } 483 }
483 484
484 @Override 485 @Override
485 - public void removeListener(MapEventListener<K, V> listener) { 486 + public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
486 listeners.remove(listener); 487 listeners.remove(listener);
488 + return CompletableFuture.completedFuture(null);
487 } 489 }
488 490
489 protected void notifyListeners(MapEvent<K, V> event) { 491 protected void notifyListeners(MapEvent<K, V> event) {
...@@ -498,5 +500,4 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V ...@@ -498,5 +500,4 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
498 } 500 }
499 }); 501 });
500 } 502 }
501 -
502 } 503 }
......
...@@ -15,124 +15,60 @@ ...@@ -15,124 +15,60 @@
15 */ 15 */
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 +import java.util.concurrent.CompletableFuture;
19 +import java.util.concurrent.TimeUnit;
20 +import org.onosproject.store.service.AsyncAtomicValue;
18 import org.onosproject.store.service.AtomicValue; 21 import org.onosproject.store.service.AtomicValue;
19 -import org.onosproject.store.service.AtomicValueEvent;
20 import org.onosproject.store.service.AtomicValueEventListener; 22 import org.onosproject.store.service.AtomicValueEventListener;
21 -import org.onosproject.store.service.ConsistentMap; 23 +import org.onosproject.store.service.StorageException;
22 -import org.onosproject.store.service.MapEvent;
23 -import org.onosproject.store.service.MapEventListener;
24 -import org.onosproject.store.service.Serializer;
25 -import org.onosproject.store.service.Versioned;
26 24
27 -import java.util.Set; 25 +import com.google.common.util.concurrent.Futures;
28 -import java.util.concurrent.CopyOnWriteArraySet;
29 26
30 /** 27 /**
31 - * Default implementation of AtomicValue. 28 + * Default implementation of {@link AtomicValue}.
32 * 29 *
33 * @param <V> value type 30 * @param <V> value type
34 */ 31 */
35 public class DefaultAtomicValue<V> implements AtomicValue<V> { 32 public class DefaultAtomicValue<V> implements AtomicValue<V> {
36 33
37 - private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>(); 34 + private static final int OPERATION_TIMEOUT_MILLIS = 5000;
38 - private final ConsistentMap<String, byte[]> valueMap; 35 + private final AsyncAtomicValue<V> asyncValue;
39 - private final String name;
40 - private final Serializer serializer;
41 - private final MapEventListener<String, byte[]> mapEventListener = new InternalMapEventListener();
42 - private final MeteringAgent monitor;
43 36
44 - private static final String COMPONENT_NAME = "atomicValue"; 37 + public DefaultAtomicValue(AsyncAtomicValue<V> asyncValue) {
45 - private static final String GET = "get"; 38 + this.asyncValue = asyncValue;
46 - private static final String GET_AND_SET = "getAndSet";
47 - private static final String COMPARE_AND_SET = "compareAndSet";
48 -
49 - public DefaultAtomicValue(ConsistentMap<String, byte[]> valueMap,
50 - String name,
51 - boolean meteringEnabled,
52 - Serializer serializer) {
53 - this.valueMap = valueMap;
54 - this.name = name;
55 - this.serializer = serializer;
56 - this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
57 } 39 }
58 40
59 @Override 41 @Override
60 public boolean compareAndSet(V expect, V update) { 42 public boolean compareAndSet(V expect, V update) {
61 - final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET); 43 + return complete(asyncValue.compareAndSet(expect, update));
62 - try {
63 - if (expect == null) {
64 - if (update == null) {
65 - return true;
66 - }
67 - return valueMap.putIfAbsent(name, serializer.encode(update)) == null;
68 - } else {
69 - if (update == null) {
70 - return valueMap.remove(name, serializer.encode(expect));
71 - }
72 - return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
73 - }
74 - } finally {
75 - newTimer.stop(null);
76 - }
77 } 44 }
78 45
79 @Override 46 @Override
80 public V get() { 47 public V get() {
81 - final MeteringAgent.Context newTimer = monitor.startTimer(GET); 48 + return complete(asyncValue.get());
82 - try {
83 - Versioned<byte[]> rawValue = valueMap.get(name);
84 - return rawValue == null ? null : serializer.decode(rawValue.value());
85 - } finally {
86 - newTimer.stop(null);
87 - }
88 } 49 }
89 50
90 @Override 51 @Override
91 public V getAndSet(V value) { 52 public V getAndSet(V value) {
92 - final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET); 53 + return complete(asyncValue.getAndSet(value));
93 - try {
94 - Versioned<byte[]> previousValue = value == null ?
95 - valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
96 - return previousValue == null ? null : serializer.decode(previousValue.value());
97 - } finally {
98 - newTimer.stop(null);
99 - }
100 } 54 }
101 55
102 @Override 56 @Override
103 public void set(V value) { 57 public void set(V value) {
104 - getAndSet(value); 58 + complete(asyncValue.set(value));
105 } 59 }
106 60
107 @Override 61 @Override
108 public void addListener(AtomicValueEventListener<V> listener) { 62 public void addListener(AtomicValueEventListener<V> listener) {
109 - synchronized (listeners) { 63 + complete(asyncValue.addListener(listener));
110 - if (listeners.add(listener)) {
111 - if (listeners.size() == 1) {
112 - valueMap.addListener(mapEventListener);
113 - }
114 - }
115 - }
116 } 64 }
117 65
118 @Override 66 @Override
119 public void removeListener(AtomicValueEventListener<V> listener) { 67 public void removeListener(AtomicValueEventListener<V> listener) {
120 - synchronized (listeners) { 68 + complete(asyncValue.removeListener(listener));
121 - if (listeners.remove(listener)) {
122 - if (listeners.size() == 0) {
123 - valueMap.removeListener(mapEventListener);
124 - }
125 - }
126 - }
127 } 69 }
128 70
129 - private class InternalMapEventListener implements MapEventListener<String, byte[]> { 71 + private static <V> V complete(CompletableFuture<V> future) {
130 - 72 + return Futures.getChecked(future, StorageException.class, OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
131 - @Override
132 - public void event(MapEvent<String, byte[]> mapEvent) {
133 - V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : serializer.decode(mapEvent.value().value());
134 - AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
135 - listeners.forEach(l -> l.event(atomicValueEvent));
136 - }
137 } 73 }
138 -} 74 +}
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 import org.onosproject.store.serializers.KryoNamespaces; 18 import org.onosproject.store.serializers.KryoNamespaces;
19 +import org.onosproject.store.service.AsyncAtomicValue;
19 import org.onosproject.store.service.AtomicValue; 20 import org.onosproject.store.service.AtomicValue;
20 import org.onosproject.store.service.AtomicValueBuilder; 21 import org.onosproject.store.service.AtomicValueBuilder;
21 import org.onosproject.store.service.ConsistentMapBuilder; 22 import org.onosproject.store.service.ConsistentMapBuilder;
...@@ -28,13 +29,12 @@ import org.onosproject.store.service.Serializer; ...@@ -28,13 +29,12 @@ import org.onosproject.store.service.Serializer;
28 */ 29 */
29 public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> { 30 public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
30 31
31 - private Serializer serializer;
32 private String name; 32 private String name;
33 - private ConsistentMapBuilder<String, byte[]> mapBuilder; 33 + private ConsistentMapBuilder<String, V> mapBuilder;
34 private boolean metering = true; 34 private boolean metering = true;
35 35
36 public DefaultAtomicValueBuilder(DatabaseManager manager) { 36 public DefaultAtomicValueBuilder(DatabaseManager manager) {
37 - mapBuilder = manager.<String, byte[]>consistentMapBuilder() 37 + mapBuilder = manager.<String, V>consistentMapBuilder()
38 .withName("onos-atomic-values") 38 .withName("onos-atomic-values")
39 .withMeteringDisabled() 39 .withMeteringDisabled()
40 .withSerializer(Serializer.using(KryoNamespaces.BASIC)); 40 .withSerializer(Serializer.using(KryoNamespaces.BASIC));
...@@ -48,7 +48,7 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> { ...@@ -48,7 +48,7 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
48 48
49 @Override 49 @Override
50 public AtomicValueBuilder<V> withSerializer(Serializer serializer) { 50 public AtomicValueBuilder<V> withSerializer(Serializer serializer) {
51 - this.serializer = serializer; 51 + mapBuilder.withSerializer(serializer);
52 return this; 52 return this;
53 } 53 }
54 54
...@@ -65,7 +65,12 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> { ...@@ -65,7 +65,12 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
65 } 65 }
66 66
67 @Override 67 @Override
68 + public AsyncAtomicValue<V> buildAsyncValue() {
69 + return new DefaultAsyncAtomicValue<>(mapBuilder.buildAsyncMap(), name, metering);
70 + }
71 +
72 + @Override
68 public AtomicValue<V> build() { 73 public AtomicValue<V> build() {
69 - return new DefaultAtomicValue<>(mapBuilder.build(), name, metering, serializer); 74 + return new DefaultAtomicValue<>(buildAsyncValue());
70 } 75 }
71 } 76 }
...\ No newline at end of file ...\ No newline at end of file
......