Madan Jampani
Committed by Gerrit Code Review

Moved client availability check to copycat client

Change-Id: I411eb74c5d39985d85c5feda976a250e77b88ff5
(cherry picked from commit 825a8b14)
...@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; ...@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException; 26 import java.util.concurrent.TimeoutException;
27 import java.util.function.Predicate; 27 import java.util.function.Predicate;
28 28
29 +import org.onlab.util.Tools;
30 +import org.onosproject.store.service.StorageException;
29 import org.slf4j.Logger; 31 import org.slf4j.Logger;
30 32
31 import com.google.common.base.Throwables; 33 import com.google.common.base.Throwables;
...@@ -38,9 +40,9 @@ import io.atomix.copycat.error.UnknownSessionException; ...@@ -38,9 +40,9 @@ import io.atomix.copycat.error.UnknownSessionException;
38 import io.atomix.copycat.session.ClosedSessionException; 40 import io.atomix.copycat.session.ClosedSessionException;
39 41
40 /** 42 /**
41 - * {@code CopycatClient} that can retry when certain recoverable errors are encoutered. 43 + * Custom {@code CopycatClient} for injecting additional logic that runs before/after operation submission.
42 */ 44 */
43 -public class QueryRetryingCopycatClient extends DelegatingCopycatClient { 45 +public class OnosCopycatClient extends DelegatingCopycatClient {
44 46
45 private final int maxRetries; 47 private final int maxRetries;
46 private final long delayBetweenRetriesMillis; 48 private final long delayBetweenRetriesMillis;
...@@ -55,7 +57,7 @@ public class QueryRetryingCopycatClient extends DelegatingCopycatClient { ...@@ -55,7 +57,7 @@ public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
55 || e instanceof UnknownSessionException 57 || e instanceof UnknownSessionException
56 || e instanceof ClosedSessionException; 58 || e instanceof ClosedSessionException;
57 59
58 - QueryRetryingCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) { 60 + OnosCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
59 super(client); 61 super(client);
60 this.maxRetries = maxRetries; 62 this.maxRetries = maxRetries;
61 this.delayBetweenRetriesMillis = delayBetweenRetriesMillis; 63 this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
...@@ -70,6 +72,9 @@ public class QueryRetryingCopycatClient extends DelegatingCopycatClient { ...@@ -70,6 +72,9 @@ public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
70 72
71 @Override 73 @Override
72 public <T> CompletableFuture<T> submit(Query<T> query) { 74 public <T> CompletableFuture<T> submit(Query<T> query) {
75 + if (state() == State.SUSPENDED || state() == State.CLOSED) {
76 + return Tools.exceptionalFuture(new StorageException.Unavailable());
77 + }
73 CompletableFuture<T> future = new CompletableFuture<>(); 78 CompletableFuture<T> future = new CompletableFuture<>();
74 executor.submit(() -> submit(query, 1, future)); 79 executor.submit(() -> submit(query, 1, future));
75 return future; 80 return future;
......
...@@ -51,7 +51,6 @@ import org.onosproject.store.service.DistributedPrimitive.Status; ...@@ -51,7 +51,6 @@ import org.onosproject.store.service.DistributedPrimitive.Status;
51 import org.onosproject.store.service.DistributedQueue; 51 import org.onosproject.store.service.DistributedQueue;
52 import org.onosproject.store.service.PartitionClientInfo; 52 import org.onosproject.store.service.PartitionClientInfo;
53 import org.onosproject.store.service.Serializer; 53 import org.onosproject.store.service.Serializer;
54 -import org.onosproject.store.service.StorageException;
55 import org.slf4j.Logger; 54 import org.slf4j.Logger;
56 55
57 import com.google.common.base.Supplier; 56 import com.google.common.base.Supplier;
...@@ -120,7 +119,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -120,7 +119,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
120 119
121 @Override 120 @Override
122 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) { 121 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
123 - checkAvailability();
124 AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join(); 122 AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
125 Consumer<State> statusListener = state -> { 123 Consumer<State> statusListener = state -> {
126 atomixConsistentMap.statusChangeListeners() 124 atomixConsistentMap.statusChangeListeners()
...@@ -145,13 +143,11 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -145,13 +143,11 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
145 143
146 @Override 144 @Override
147 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) { 145 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
148 - checkAvailability();
149 return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer)); 146 return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
150 } 147 }
151 148
152 @Override 149 @Override
153 public AsyncAtomicCounter newAsyncCounter(String name) { 150 public AsyncAtomicCounter newAsyncCounter(String name) {
154 - checkAvailability();
155 DistributedLong distributedLong = client.getLong(name).join(); 151 DistributedLong distributedLong = client.getLong(name).join();
156 return new AtomixCounter(name, distributedLong); 152 return new AtomixCounter(name, distributedLong);
157 } 153 }
...@@ -169,7 +165,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -169,7 +165,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
169 165
170 @Override 166 @Override
171 public AsyncLeaderElector newAsyncLeaderElector(String name) { 167 public AsyncLeaderElector newAsyncLeaderElector(String name) {
172 - checkAvailability();
173 AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class) 168 AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
174 .thenCompose(AtomixLeaderElector::setupCache) 169 .thenCompose(AtomixLeaderElector::setupCache)
175 .join(); 170 .join();
...@@ -183,13 +178,11 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -183,13 +178,11 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
183 178
184 @Override 179 @Override
185 public Set<String> getAsyncConsistentMapNames() { 180 public Set<String> getAsyncConsistentMapNames() {
186 - checkAvailability();
187 return client.keys(AtomixConsistentMap.class).join(); 181 return client.keys(AtomixConsistentMap.class).join();
188 } 182 }
189 183
190 @Override 184 @Override
191 public Set<String> getAsyncAtomicCounterNames() { 185 public Set<String> getAsyncAtomicCounterNames() {
192 - checkAvailability();
193 return client.keys(DistributedLong.class).join(); 186 return client.keys(DistributedLong.class).join();
194 } 187 }
195 188
...@@ -232,12 +225,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -232,12 +225,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
232 throw new ResourceManagerException(e); 225 throw new ResourceManagerException(e);
233 } 226 }
234 } 227 }
235 - return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100)); 228 + return new ResourceClient(new OnosCopycatClient(copycatClient, 2, 100));
236 - }
237 -
238 - private void checkAvailability() {
239 - if (resourceClient.client().state() == State.SUSPENDED || resourceClient.client().state() == State.CLOSED) {
240 - throw new StorageException.Unavailable();
241 - }
242 } 229 }
243 } 230 }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.primitives.resources.impl; 16 package org.onosproject.store.primitives.resources.impl;
17 17
18 -import io.atomix.copycat.Operation;
19 import io.atomix.copycat.client.CopycatClient; 18 import io.atomix.copycat.client.CopycatClient;
20 import io.atomix.resource.AbstractResource; 19 import io.atomix.resource.AbstractResource;
21 import io.atomix.resource.ResourceTypeInfo; 20 import io.atomix.resource.ResourceTypeInfo;
...@@ -35,7 +34,6 @@ import java.util.function.Consumer; ...@@ -35,7 +34,6 @@ import java.util.function.Consumer;
35 import java.util.function.Predicate; 34 import java.util.function.Predicate;
36 35
37 import org.onlab.util.Match; 36 import org.onlab.util.Match;
38 -import org.onlab.util.Tools;
39 import org.onosproject.store.primitives.TransactionId; 37 import org.onosproject.store.primitives.TransactionId;
40 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear; 38 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
41 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey; 39 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
...@@ -57,7 +55,6 @@ import org.onosproject.store.service.AsyncConsistentMap; ...@@ -57,7 +55,6 @@ import org.onosproject.store.service.AsyncConsistentMap;
57 import org.onosproject.store.service.MapEvent; 55 import org.onosproject.store.service.MapEvent;
58 import org.onosproject.store.service.MapEventListener; 56 import org.onosproject.store.service.MapEventListener;
59 import org.onosproject.store.service.MapTransaction; 57 import org.onosproject.store.service.MapTransaction;
60 -import org.onosproject.store.service.StorageException;
61 import org.onosproject.store.service.Versioned; 58 import org.onosproject.store.service.Versioned;
62 59
63 import com.google.common.collect.ImmutableSet; 60 import com.google.common.collect.ImmutableSet;
...@@ -100,48 +97,48 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -100,48 +97,48 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
100 97
101 @Override 98 @Override
102 public CompletableFuture<Boolean> isEmpty() { 99 public CompletableFuture<Boolean> isEmpty() {
103 - return submit(new IsEmpty()); 100 + return client.submit(new IsEmpty());
104 } 101 }
105 102
106 @Override 103 @Override
107 public CompletableFuture<Integer> size() { 104 public CompletableFuture<Integer> size() {
108 - return submit(new Size()); 105 + return client.submit(new Size());
109 } 106 }
110 107
111 @Override 108 @Override
112 public CompletableFuture<Boolean> containsKey(String key) { 109 public CompletableFuture<Boolean> containsKey(String key) {
113 - return submit(new ContainsKey(key)); 110 + return client.submit(new ContainsKey(key));
114 } 111 }
115 112
116 @Override 113 @Override
117 public CompletableFuture<Boolean> containsValue(byte[] value) { 114 public CompletableFuture<Boolean> containsValue(byte[] value) {
118 - return submit(new ContainsValue(value)); 115 + return client.submit(new ContainsValue(value));
119 } 116 }
120 117
121 @Override 118 @Override
122 public CompletableFuture<Versioned<byte[]>> get(String key) { 119 public CompletableFuture<Versioned<byte[]>> get(String key) {
123 - return submit(new Get(key)); 120 + return client.submit(new Get(key));
124 } 121 }
125 122
126 @Override 123 @Override
127 public CompletableFuture<Set<String>> keySet() { 124 public CompletableFuture<Set<String>> keySet() {
128 - return submit(new KeySet()); 125 + return client.submit(new KeySet());
129 } 126 }
130 127
131 @Override 128 @Override
132 public CompletableFuture<Collection<Versioned<byte[]>>> values() { 129 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
133 - return submit(new Values()); 130 + return client.submit(new Values());
134 } 131 }
135 132
136 @Override 133 @Override
137 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() { 134 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
138 - return submit(new EntrySet()); 135 + return client.submit(new EntrySet());
139 } 136 }
140 137
141 @Override 138 @Override
142 @SuppressWarnings("unchecked") 139 @SuppressWarnings("unchecked")
143 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) { 140 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
144 - return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY)) 141 + return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
145 .whenComplete((r, e) -> throwIfLocked(r.status())) 142 .whenComplete((r, e) -> throwIfLocked(r.status()))
146 .thenApply(v -> v.oldValue()); 143 .thenApply(v -> v.oldValue());
147 } 144 }
...@@ -149,7 +146,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -149,7 +146,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
149 @Override 146 @Override
150 @SuppressWarnings("unchecked") 147 @SuppressWarnings("unchecked")
151 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) { 148 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
152 - return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY)) 149 + return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
153 .whenComplete((r, e) -> throwIfLocked(r.status())) 150 .whenComplete((r, e) -> throwIfLocked(r.status()))
154 .thenApply(v -> v.newValue()); 151 .thenApply(v -> v.newValue());
155 } 152 }
...@@ -157,14 +154,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -157,14 +154,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
157 @Override 154 @Override
158 @SuppressWarnings("unchecked") 155 @SuppressWarnings("unchecked")
159 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) { 156 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
160 - return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY)) 157 + return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
161 .whenComplete((r, e) -> throwIfLocked(r.status())) 158 .whenComplete((r, e) -> throwIfLocked(r.status()))
162 .thenApply(v -> v.oldValue()); 159 .thenApply(v -> v.oldValue());
163 } 160 }
164 @Override 161 @Override
165 @SuppressWarnings("unchecked") 162 @SuppressWarnings("unchecked")
166 public CompletableFuture<Versioned<byte[]>> remove(String key) { 163 public CompletableFuture<Versioned<byte[]>> remove(String key) {
167 - return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY)) 164 + return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
168 .whenComplete((r, e) -> throwIfLocked(r.status())) 165 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.oldValue()); 166 .thenApply(v -> v.oldValue());
170 } 167 }
...@@ -172,7 +169,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -172,7 +169,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
172 @Override 169 @Override
173 @SuppressWarnings("unchecked") 170 @SuppressWarnings("unchecked")
174 public CompletableFuture<Boolean> remove(String key, byte[] value) { 171 public CompletableFuture<Boolean> remove(String key, byte[] value) {
175 - return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY)) 172 + return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
176 .whenComplete((r, e) -> throwIfLocked(r.status())) 173 .whenComplete((r, e) -> throwIfLocked(r.status()))
177 .thenApply(v -> v.updated()); 174 .thenApply(v -> v.updated());
178 } 175 }
...@@ -180,7 +177,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -180,7 +177,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
180 @Override 177 @Override
181 @SuppressWarnings("unchecked") 178 @SuppressWarnings("unchecked")
182 public CompletableFuture<Boolean> remove(String key, long version) { 179 public CompletableFuture<Boolean> remove(String key, long version) {
183 - return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version))) 180 + return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
184 .whenComplete((r, e) -> throwIfLocked(r.status())) 181 .whenComplete((r, e) -> throwIfLocked(r.status()))
185 .thenApply(v -> v.updated()); 182 .thenApply(v -> v.updated());
186 } 183 }
...@@ -188,7 +185,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -188,7 +185,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
188 @Override 185 @Override
189 @SuppressWarnings("unchecked") 186 @SuppressWarnings("unchecked")
190 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) { 187 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
191 - return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY)) 188 + return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
192 .whenComplete((r, e) -> throwIfLocked(r.status())) 189 .whenComplete((r, e) -> throwIfLocked(r.status()))
193 .thenApply(v -> v.oldValue()); 190 .thenApply(v -> v.oldValue());
194 } 191 }
...@@ -196,7 +193,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -196,7 +193,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
196 @Override 193 @Override
197 @SuppressWarnings("unchecked") 194 @SuppressWarnings("unchecked")
198 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) { 195 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
199 - return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY)) 196 + return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
200 .whenComplete((r, e) -> throwIfLocked(r.status())) 197 .whenComplete((r, e) -> throwIfLocked(r.status()))
201 .thenApply(v -> v.updated()); 198 .thenApply(v -> v.updated());
202 } 199 }
...@@ -204,14 +201,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -204,14 +201,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
204 @Override 201 @Override
205 @SuppressWarnings("unchecked") 202 @SuppressWarnings("unchecked")
206 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) { 203 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
207 - return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion))) 204 + return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
208 .whenComplete((r, e) -> throwIfLocked(r.status())) 205 .whenComplete((r, e) -> throwIfLocked(r.status()))
209 .thenApply(v -> v.updated()); 206 .thenApply(v -> v.updated());
210 } 207 }
211 208
212 @Override 209 @Override
213 public CompletableFuture<Void> clear() { 210 public CompletableFuture<Void> clear() {
214 - return submit(new Clear()) 211 + return client.submit(new Clear())
215 .whenComplete((r, e) -> throwIfLocked(r)) 212 .whenComplete((r, e) -> throwIfLocked(r))
216 .thenApply(v -> null); 213 .thenApply(v -> null);
217 } 214 }
...@@ -242,7 +239,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -242,7 +239,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
242 } 239 }
243 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY; 240 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
244 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version()); 241 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
245 - return submit(new UpdateAndGet(key, 242 + return client.submit(new UpdateAndGet(key,
246 computedValue.get(), 243 computedValue.get(),
247 valueMatch, 244 valueMatch,
248 versionMatch)) 245 versionMatch))
...@@ -255,7 +252,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -255,7 +252,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
255 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener, 252 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
256 Executor executor) { 253 Executor executor) {
257 if (mapEventListeners.isEmpty()) { 254 if (mapEventListeners.isEmpty()) {
258 - return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor)); 255 + return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
259 } else { 256 } else {
260 mapEventListeners.put(listener, executor); 257 mapEventListeners.put(listener, executor);
261 return CompletableFuture.completedFuture(null); 258 return CompletableFuture.completedFuture(null);
...@@ -265,7 +262,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -265,7 +262,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
265 @Override 262 @Override
266 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) { 263 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
267 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) { 264 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
268 - return submit(new Unlisten()).thenApply(v -> null); 265 + return client.submit(new Unlisten()).thenApply(v -> null);
269 } 266 }
270 return CompletableFuture.completedFuture(null); 267 return CompletableFuture.completedFuture(null);
271 } 268 }
...@@ -278,23 +275,23 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -278,23 +275,23 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
278 275
279 @Override 276 @Override
280 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) { 277 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
281 - return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK); 278 + return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
282 } 279 }
283 280
284 @Override 281 @Override
285 public CompletableFuture<Void> commit(TransactionId transactionId) { 282 public CompletableFuture<Void> commit(TransactionId transactionId) {
286 - return submit(new TransactionCommit(transactionId)).thenApply(v -> null); 283 + return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
287 } 284 }
288 285
289 @Override 286 @Override
290 public CompletableFuture<Void> rollback(TransactionId transactionId) { 287 public CompletableFuture<Void> rollback(TransactionId transactionId) {
291 - return submit(new TransactionRollback(transactionId)) 288 + return client.submit(new TransactionRollback(transactionId))
292 .thenApply(v -> null); 289 .thenApply(v -> null);
293 } 290 }
294 291
295 @Override 292 @Override
296 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) { 293 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
297 - return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK); 294 + return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
298 } 295 }
299 296
300 @Override 297 @Override
...@@ -311,11 +308,4 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -311,11 +308,4 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
311 public Collection<Consumer<Status>> statusChangeListeners() { 308 public Collection<Consumer<Status>> statusChangeListeners() {
312 return ImmutableSet.copyOf(statusChangeListeners); 309 return ImmutableSet.copyOf(statusChangeListeners);
313 } 310 }
314 -
315 - <T> CompletableFuture<T> submit(Operation<T> command) {
316 - if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
317 - return Tools.exceptionalFuture(new StorageException.Unavailable());
318 - }
319 - return client.submit(command);
320 - }
321 } 311 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.primitives.resources.impl; 16 package org.onosproject.store.primitives.resources.impl;
17 17
18 -import io.atomix.copycat.Operation;
19 import io.atomix.copycat.client.CopycatClient; 18 import io.atomix.copycat.client.CopycatClient;
20 import io.atomix.resource.AbstractResource; 19 import io.atomix.resource.AbstractResource;
21 import io.atomix.resource.ResourceTypeInfo; 20 import io.atomix.resource.ResourceTypeInfo;
...@@ -28,7 +27,6 @@ import java.util.Set; ...@@ -28,7 +27,6 @@ import java.util.Set;
28 import java.util.concurrent.CompletableFuture; 27 import java.util.concurrent.CompletableFuture;
29 import java.util.function.Consumer; 28 import java.util.function.Consumer;
30 29
31 -import org.onlab.util.Tools;
32 import org.onosproject.cluster.Leadership; 30 import org.onosproject.cluster.Leadership;
33 import org.onosproject.cluster.NodeId; 31 import org.onosproject.cluster.NodeId;
34 import org.onosproject.event.Change; 32 import org.onosproject.event.Change;
...@@ -42,8 +40,6 @@ import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorComman ...@@ -42,8 +40,6 @@ import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorComman
42 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten; 40 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
43 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw; 41 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
44 import org.onosproject.store.service.AsyncLeaderElector; 42 import org.onosproject.store.service.AsyncLeaderElector;
45 -import org.onosproject.store.service.StorageException;
46 -
47 import com.google.common.collect.ImmutableSet; 43 import com.google.common.collect.ImmutableSet;
48 import com.google.common.cache.CacheBuilder; 44 import com.google.common.cache.CacheBuilder;
49 import com.google.common.cache.CacheLoader; 45 import com.google.common.cache.CacheLoader;
...@@ -70,7 +66,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -70,7 +66,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
70 super(client, properties); 66 super(client, properties);
71 cache = CacheBuilder.newBuilder() 67 cache = CacheBuilder.newBuilder()
72 .maximumSize(1000) 68 .maximumSize(1000)
73 - .build(CacheLoader.from(topic -> submit(new GetLeadership(topic)))); 69 + .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
74 70
75 cacheUpdater = change -> { 71 cacheUpdater = change -> {
76 Leadership leadership = change.newValue(); 72 Leadership leadership = change.newValue();
...@@ -113,27 +109,27 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -113,27 +109,27 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
113 109
114 @Override 110 @Override
115 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) { 111 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
116 - return submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); 112 + return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
117 } 113 }
118 114
119 @Override 115 @Override
120 public CompletableFuture<Void> withdraw(String topic) { 116 public CompletableFuture<Void> withdraw(String topic) {
121 - return submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic)); 117 + return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
122 } 118 }
123 119
124 @Override 120 @Override
125 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) { 121 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
126 - return submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); 122 + return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
127 } 123 }
128 124
129 @Override 125 @Override
130 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) { 126 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
131 - return submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); 127 + return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
132 } 128 }
133 129
134 @Override 130 @Override
135 public CompletableFuture<Void> evict(NodeId nodeId) { 131 public CompletableFuture<Void> evict(NodeId nodeId) {
136 - return submit(new AtomixLeaderElectorCommands.Evict(nodeId)); 132 + return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
137 } 133 }
138 134
139 @Override 135 @Override
...@@ -148,17 +144,17 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -148,17 +144,17 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
148 144
149 @Override 145 @Override
150 public CompletableFuture<Map<String, Leadership>> getLeaderships() { 146 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
151 - return submit(new GetAllLeaderships()); 147 + return client.submit(new GetAllLeaderships());
152 } 148 }
153 149
154 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) { 150 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
155 - return submit(new GetElectedTopics(nodeId)); 151 + return client.submit(new GetElectedTopics(nodeId));
156 } 152 }
157 153
158 @Override 154 @Override
159 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) { 155 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
160 if (leadershipChangeListeners.isEmpty()) { 156 if (leadershipChangeListeners.isEmpty()) {
161 - return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer)); 157 + return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
162 } else { 158 } else {
163 leadershipChangeListeners.add(consumer); 159 leadershipChangeListeners.add(consumer);
164 return CompletableFuture.completedFuture(null); 160 return CompletableFuture.completedFuture(null);
...@@ -168,7 +164,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -168,7 +164,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
168 @Override 164 @Override
169 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) { 165 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
170 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) { 166 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
171 - return submit(new Unlisten()).thenApply(v -> null); 167 + return client.submit(new Unlisten()).thenApply(v -> null);
172 } 168 }
173 return CompletableFuture.completedFuture(null); 169 return CompletableFuture.completedFuture(null);
174 } 170 }
...@@ -187,11 +183,4 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -187,11 +183,4 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
187 public Collection<Consumer<Status>> statusChangeListeners() { 183 public Collection<Consumer<Status>> statusChangeListeners() {
188 return ImmutableSet.copyOf(statusChangeListeners); 184 return ImmutableSet.copyOf(statusChangeListeners);
189 } 185 }
190 -
191 - <T> CompletableFuture<T> submit(Operation<T> command) {
192 - if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
193 - return Tools.exceptionalFuture(new StorageException.Unavailable());
194 - }
195 - return client.submit(command);
196 - }
197 } 186 }
......