Madan Jampani
Committed by Gerrit Code Review

Added RetryingFunction for simplified retry support.

Moved retry logic out of primitives (AtomicCounter) to the caller site.

Change-Id: I319d61f153f98d421baf32a1b5cd69d20dc63427
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.service; 16 package org.onosproject.store.service;
17 17
18 -import java.util.concurrent.ScheduledExecutorService;
19 -
20 /** 18 /**
21 * Builder for AtomicCounter. 19 * Builder for AtomicCounter.
22 */ 20 */
...@@ -50,15 +48,6 @@ public interface AtomicCounterBuilder { ...@@ -50,15 +48,6 @@ public interface AtomicCounterBuilder {
50 AtomicCounterBuilder withPartitionsDisabled(); 48 AtomicCounterBuilder withPartitionsDisabled();
51 49
52 /** 50 /**
53 - * Enables retries when counter operations fail.
54 - * <p>
55 - * Note: Use with caution. By default retries are disabled.
56 - * </p>
57 - * @return this AtomicCounterBuilder
58 - */
59 - AtomicCounterBuilder withRetryOnFailure();
60 -
61 - /**
62 * Instantiates Metering service to gather usage and performance metrics. 51 * Instantiates Metering service to gather usage and performance metrics.
63 * By default, usage data will be stored. 52 * By default, usage data will be stored.
64 * 53 *
...@@ -67,16 +56,6 @@ public interface AtomicCounterBuilder { ...@@ -67,16 +56,6 @@ public interface AtomicCounterBuilder {
67 AtomicCounterBuilder withMeteringDisabled(); 56 AtomicCounterBuilder withMeteringDisabled();
68 57
69 /** 58 /**
70 - * Sets the executor service to use for retrying failed operations.
71 - * <p>
72 - * Note: Must be set when retries are enabled
73 - * </p>
74 - * @param executor executor service
75 - * @return this AtomicCounterBuilder
76 - */
77 - AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor);
78 -
79 - /**
80 * Builds a AtomicCounter based on the configuration options 59 * Builds a AtomicCounter based on the configuration options
81 * supplied to this builder. 60 * supplied to this builder.
82 * 61 *
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.service; 16 package org.onosproject.store.service;
17 17
18 -import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.atomic.AtomicLong; 18 import java.util.concurrent.atomic.AtomicLong;
20 19
21 /** 20 /**
...@@ -69,21 +68,11 @@ public final class TestAtomicCounter implements AtomicCounter { ...@@ -69,21 +68,11 @@ public final class TestAtomicCounter implements AtomicCounter {
69 } 68 }
70 69
71 @Override 70 @Override
72 - public AtomicCounterBuilder withRetryOnFailure() {
73 - return this;
74 - }
75 -
76 - @Override
77 public AtomicCounterBuilder withMeteringDisabled() { 71 public AtomicCounterBuilder withMeteringDisabled() {
78 return this; 72 return this;
79 } 73 }
80 74
81 @Override 75 @Override
82 - public AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor) {
83 - return this;
84 - }
85 -
86 - @Override
87 public AsyncAtomicCounter buildAsyncCounter() { 76 public AsyncAtomicCounter buildAsyncCounter() {
88 throw new UnsupportedOperationException("Async Counter is not supported"); 77 throw new UnsupportedOperationException("Async Counter is not supported");
89 } 78 }
......
...@@ -16,15 +16,9 @@ ...@@ -16,15 +16,9 @@
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 import org.onosproject.store.service.AsyncAtomicCounter; 18 import org.onosproject.store.service.AsyncAtomicCounter;
19 -import org.slf4j.Logger;
20 19
21 import java.util.concurrent.CompletableFuture; 20 import java.util.concurrent.CompletableFuture;
22 -import java.util.concurrent.ScheduledExecutorService;
23 -import java.util.concurrent.TimeUnit;
24 -import java.util.function.BiFunction;
25 -
26 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkNotNull;
27 -import static org.slf4j.LoggerFactory.getLogger;
28 22
29 /** 23 /**
30 * Default implementation for a distributed AsyncAtomicCounter backed by 24 * Default implementation for a distributed AsyncAtomicCounter backed by
...@@ -36,11 +30,6 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -36,11 +30,6 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
36 30
37 private final String name; 31 private final String name;
38 private final Database database; 32 private final Database database;
39 - private final boolean retryOnFailure;
40 - private final ScheduledExecutorService retryExecutor;
41 - // TODO: configure delay via builder
42 - private static final int DELAY_BETWEEN_RETRY_SEC = 1;
43 - private final Logger log = getLogger(getClass());
44 private final MeteringAgent monitor; 33 private final MeteringAgent monitor;
45 34
46 private static final String PRIMITIVE_NAME = "atomicCounter"; 35 private static final String PRIMITIVE_NAME = "atomicCounter";
...@@ -52,13 +41,9 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -52,13 +41,9 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
52 41
53 public DefaultAsyncAtomicCounter(String name, 42 public DefaultAsyncAtomicCounter(String name,
54 Database database, 43 Database database,
55 - boolean retryOnException, 44 + boolean meteringEnabled) {
56 - boolean meteringEnabled,
57 - ScheduledExecutorService retryExecutor) {
58 this.name = checkNotNull(name); 45 this.name = checkNotNull(name);
59 this.database = checkNotNull(database); 46 this.database = checkNotNull(database);
60 - this.retryOnFailure = retryOnException;
61 - this.retryExecutor = retryExecutor;
62 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); 47 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
63 } 48 }
64 49
...@@ -86,77 +71,14 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -86,77 +71,14 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
86 @Override 71 @Override
87 public CompletableFuture<Long> getAndAdd(long delta) { 72 public CompletableFuture<Long> getAndAdd(long delta) {
88 final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD); 73 final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
89 - CompletableFuture<Long> result = database.counterGetAndAdd(name, delta); 74 + return database.counterGetAndAdd(name, delta)
90 - if (!retryOnFailure) {
91 - return result
92 .whenComplete((r, e) -> timer.stop()); 75 .whenComplete((r, e) -> timer.stop());
93 } 76 }
94 77
95 - CompletableFuture<Long> future = new CompletableFuture<>();
96 - return result.whenComplete((r, e) -> {
97 - timer.stop();
98 - // TODO : Account for retries
99 - if (e != null) {
100 - log.warn("getAndAdd failed due to {}. Will retry", e.getMessage());
101 - retryExecutor.schedule(new RetryTask(database::counterGetAndAdd, delta, future),
102 - DELAY_BETWEEN_RETRY_SEC,
103 - TimeUnit.SECONDS);
104 - } else {
105 - future.complete(r);
106 - }
107 - }).thenCompose(v -> future);
108 - }
109 -
110 @Override 78 @Override
111 public CompletableFuture<Long> addAndGet(long delta) { 79 public CompletableFuture<Long> addAndGet(long delta) {
112 final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET); 80 final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
113 - CompletableFuture<Long> result = database.counterAddAndGet(name, delta); 81 + return database.counterAddAndGet(name, delta)
114 - if (!retryOnFailure) {
115 - return result
116 .whenComplete((r, e) -> timer.stop()); 82 .whenComplete((r, e) -> timer.stop());
117 } 83 }
118 -
119 - CompletableFuture<Long> future = new CompletableFuture<>();
120 - return result.whenComplete((r, e) -> {
121 - timer.stop();
122 - // TODO : Account for retries
123 - if (e != null) {
124 - log.warn("addAndGet failed due to {}. Will retry", e.getMessage());
125 - retryExecutor.schedule(new RetryTask(database::counterAddAndGet, delta, future),
126 - DELAY_BETWEEN_RETRY_SEC,
127 - TimeUnit.SECONDS);
128 - } else {
129 - future.complete(r);
130 - }
131 - }).thenCompose(v -> future);
132 - }
133 -
134 - private class RetryTask implements Runnable {
135 -
136 - private final BiFunction<String, Long, CompletableFuture<Long>> function;
137 - private final Long delta;
138 - private final CompletableFuture<Long> result;
139 -
140 - public RetryTask(BiFunction<String, Long, CompletableFuture<Long>> function,
141 - Long delta,
142 - CompletableFuture<Long> result) {
143 - this.function = function;
144 - this.delta = delta;
145 - this.result = result;
146 - }
147 -
148 - @Override
149 - public void run() {
150 - function.apply(name, delta).whenComplete((r, e) -> {
151 - if (e == null) {
152 - result.complete(r);
153 - } else {
154 - log.warn("{} retry failed due to {}. Will try again...", function, e.getMessage());
155 - // TODO: Exponential backoff
156 - // TODO: limit retries
157 - retryExecutor.schedule(this, DELAY_BETWEEN_RETRY_SEC, TimeUnit.SECONDS);
158 - }
159 - });
160 - }
161 - }
162 } 84 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -21,7 +21,6 @@ import org.onosproject.store.service.StorageException; ...@@ -21,7 +21,6 @@ import org.onosproject.store.service.StorageException;
21 21
22 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.ExecutionException; 23 import java.util.concurrent.ExecutionException;
24 -import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException; 25 import java.util.concurrent.TimeoutException;
27 26
...@@ -39,10 +38,8 @@ public class DefaultAtomicCounter implements AtomicCounter { ...@@ -39,10 +38,8 @@ public class DefaultAtomicCounter implements AtomicCounter {
39 38
40 public DefaultAtomicCounter(String name, 39 public DefaultAtomicCounter(String name,
41 Database database, 40 Database database,
42 - boolean retryOnException, 41 + boolean meteringEnabled) {
43 - boolean meteringEnabled, 42 + asyncCounter = new DefaultAsyncAtomicCounter(name, database, meteringEnabled);
44 - ScheduledExecutorService retryExecutor) {
45 - asyncCounter = new DefaultAsyncAtomicCounter(name, database, retryOnException, meteringEnabled, retryExecutor);
46 } 43 }
47 44
48 @Override 45 @Override
......
...@@ -19,9 +19,8 @@ import org.onosproject.store.service.AsyncAtomicCounter; ...@@ -19,9 +19,8 @@ import org.onosproject.store.service.AsyncAtomicCounter;
19 import org.onosproject.store.service.AtomicCounter; 19 import org.onosproject.store.service.AtomicCounter;
20 import org.onosproject.store.service.AtomicCounterBuilder; 20 import org.onosproject.store.service.AtomicCounterBuilder;
21 21
22 -import java.util.concurrent.ScheduledExecutorService;
23 -
24 import static com.google.common.base.Preconditions.checkArgument; 22 import static com.google.common.base.Preconditions.checkArgument;
23 +import static com.google.common.base.Preconditions.checkState;
25 24
26 /** 25 /**
27 * Default implementation of AtomicCounterBuilder. 26 * Default implementation of AtomicCounterBuilder.
...@@ -32,9 +31,7 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { ...@@ -32,9 +31,7 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
32 private boolean partitionsEnabled = true; 31 private boolean partitionsEnabled = true;
33 private final Database partitionedDatabase; 32 private final Database partitionedDatabase;
34 private final Database inMemoryDatabase; 33 private final Database inMemoryDatabase;
35 - private boolean retryOnFailure = false;
36 private boolean metering = true; 34 private boolean metering = true;
37 - private ScheduledExecutorService retryExecutor = null;
38 35
39 public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) { 36 public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
40 this.inMemoryDatabase = inMemoryDatabase; 37 this.inMemoryDatabase = inMemoryDatabase;
...@@ -58,20 +55,14 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { ...@@ -58,20 +55,14 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
58 public AtomicCounter build() { 55 public AtomicCounter build() {
59 validateInputs(); 56 validateInputs();
60 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; 57 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
61 - return new DefaultAtomicCounter(name, database, retryOnFailure, metering, retryExecutor); 58 + return new DefaultAtomicCounter(name, database, metering);
62 } 59 }
63 60
64 @Override 61 @Override
65 public AsyncAtomicCounter buildAsyncCounter() { 62 public AsyncAtomicCounter buildAsyncCounter() {
66 validateInputs(); 63 validateInputs();
67 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; 64 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
68 - return new DefaultAsyncAtomicCounter(name, database, retryOnFailure, metering, retryExecutor); 65 + return new DefaultAsyncAtomicCounter(name, database, metering);
69 - }
70 -
71 - @Override
72 - public AtomicCounterBuilder withRetryOnFailure() {
73 - retryOnFailure = true;
74 - return this;
75 } 66 }
76 67
77 @Override 68 @Override
...@@ -80,17 +71,7 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { ...@@ -80,17 +71,7 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
80 return this; 71 return this;
81 } 72 }
82 73
83 - @Override
84 - public AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor) {
85 - this.retryExecutor = executor;
86 - return this;
87 - }
88 -
89 private void validateInputs() { 74 private void validateInputs() {
90 - if (retryOnFailure) { 75 + checkState(name != null, "name must be specified");
91 - if (retryExecutor == null) {
92 - throw new IllegalArgumentException("RetryExecutor must be specified when retries are enabled");
93 - }
94 - }
95 } 76 }
96 } 77 }
......
...@@ -15,12 +15,10 @@ ...@@ -15,12 +15,10 @@
15 */ 15 */
16 package org.onosproject.store.core.impl; 16 package org.onosproject.store.core.impl;
17 17
18 -import static org.onlab.util.Tools.groupedThreads;
19 import static org.slf4j.LoggerFactory.getLogger; 18 import static org.slf4j.LoggerFactory.getLogger;
20 19
21 import java.util.Map; 20 import java.util.Map;
22 import java.util.Set; 21 import java.util.Set;
23 -import java.util.concurrent.Executors;
24 import java.util.concurrent.ScheduledExecutorService; 22 import java.util.concurrent.ScheduledExecutorService;
25 23
26 import org.apache.felix.scr.annotations.Activate; 24 import org.apache.felix.scr.annotations.Activate;
...@@ -30,20 +28,21 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -30,20 +28,21 @@ import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality; 28 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.apache.felix.scr.annotations.Service; 29 import org.apache.felix.scr.annotations.Service;
32 import org.onlab.util.KryoNamespace; 30 import org.onlab.util.KryoNamespace;
31 +import org.onlab.util.Tools;
33 import org.onosproject.core.ApplicationId; 32 import org.onosproject.core.ApplicationId;
34 import org.onosproject.core.ApplicationIdStore; 33 import org.onosproject.core.ApplicationIdStore;
35 import org.onosproject.core.DefaultApplicationId; 34 import org.onosproject.core.DefaultApplicationId;
36 import org.onosproject.store.serializers.KryoNamespaces; 35 import org.onosproject.store.serializers.KryoNamespaces;
37 -import org.onosproject.store.service.AsyncAtomicCounter; 36 +import org.onosproject.store.service.AtomicCounter;
38 import org.onosproject.store.service.ConsistentMap; 37 import org.onosproject.store.service.ConsistentMap;
39 import org.onosproject.store.service.Serializer; 38 import org.onosproject.store.service.Serializer;
39 +import org.onosproject.store.service.StorageException;
40 import org.onosproject.store.service.StorageService; 40 import org.onosproject.store.service.StorageService;
41 import org.onosproject.store.service.Versioned; 41 import org.onosproject.store.service.Versioned;
42 import org.slf4j.Logger; 42 import org.slf4j.Logger;
43 43
44 import com.google.common.collect.ImmutableSet; 44 import com.google.common.collect.ImmutableSet;
45 import com.google.common.collect.Maps; 45 import com.google.common.collect.Maps;
46 -import com.google.common.util.concurrent.Futures;
47 46
48 /** 47 /**
49 * ApplicationIdStore implementation on top of {@code AtomicCounter} 48 * ApplicationIdStore implementation on top of {@code AtomicCounter}
...@@ -58,7 +57,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -58,7 +57,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 57 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 protected StorageService storageService; 58 protected StorageService storageService;
60 59
61 - private AsyncAtomicCounter appIdCounter; 60 + private AtomicCounter appIdCounter;
62 private ConsistentMap<String, ApplicationId> registeredIds; 61 private ConsistentMap<String, ApplicationId> registeredIds;
63 private Map<String, ApplicationId> nameToAppIdCache = Maps.newConcurrentMap(); 62 private Map<String, ApplicationId> nameToAppIdCache = Maps.newConcurrentMap();
64 private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap(); 63 private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap();
...@@ -71,13 +70,10 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -71,13 +70,10 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
71 70
72 @Activate 71 @Activate
73 public void activate() { 72 public void activate() {
74 - executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/store/appId", "retry-handler"));
75 appIdCounter = storageService.atomicCounterBuilder() 73 appIdCounter = storageService.atomicCounterBuilder()
76 .withName("onos-app-id-counter") 74 .withName("onos-app-id-counter")
77 .withPartitionsDisabled() 75 .withPartitionsDisabled()
78 - .withRetryOnFailure() 76 + .build();
79 - .withRetryExecutor(executor)
80 - .buildAsyncCounter();
81 77
82 registeredIds = storageService.<String, ApplicationId>consistentMapBuilder() 78 registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
83 .withName("onos-app-ids") 79 .withName("onos-app-ids")
...@@ -128,7 +124,9 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -128,7 +124,9 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
128 ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> { 124 ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> {
129 Versioned<ApplicationId> existingAppId = registeredIds.get(name); 125 Versioned<ApplicationId> existingAppId = registeredIds.get(name);
130 if (existingAppId == null) { 126 if (existingAppId == null) {
131 - int id = Futures.getUnchecked(appIdCounter.incrementAndGet()).intValue(); 127 + int id = Tools.retryable(appIdCounter::incrementAndGet, StorageException.class, 1, 2000)
128 + .get()
129 + .intValue();
132 DefaultApplicationId newAppId = new DefaultApplicationId(id, name); 130 DefaultApplicationId newAppId = new DefaultApplicationId(id, name);
133 existingAppId = registeredIds.putIfAbsent(name, newAppId); 131 existingAppId = registeredIds.putIfAbsent(name, newAppId);
134 if (existingAppId != null) { 132 if (existingAppId != null) {
......
1 package org.onosproject.store.core.impl; 1 package org.onosproject.store.core.impl;
2 2
3 import com.google.common.collect.Maps; 3 import com.google.common.collect.Maps;
4 +
4 import org.apache.felix.scr.annotations.Activate; 5 import org.apache.felix.scr.annotations.Activate;
5 import org.apache.felix.scr.annotations.Component; 6 import org.apache.felix.scr.annotations.Component;
6 import org.apache.felix.scr.annotations.Deactivate; 7 import org.apache.felix.scr.annotations.Deactivate;
7 import org.apache.felix.scr.annotations.Reference; 8 import org.apache.felix.scr.annotations.Reference;
8 import org.apache.felix.scr.annotations.ReferenceCardinality; 9 import org.apache.felix.scr.annotations.ReferenceCardinality;
9 import org.apache.felix.scr.annotations.Service; 10 import org.apache.felix.scr.annotations.Service;
11 +import org.onlab.util.Tools;
10 import org.onosproject.core.IdBlock; 12 import org.onosproject.core.IdBlock;
11 import org.onosproject.core.IdBlockStore; 13 import org.onosproject.core.IdBlockStore;
12 import org.onosproject.store.service.AtomicCounter; 14 import org.onosproject.store.service.AtomicCounter;
...@@ -16,7 +18,6 @@ import org.slf4j.Logger; ...@@ -16,7 +18,6 @@ import org.slf4j.Logger;
16 18
17 import java.util.Map; 19 import java.util.Map;
18 20
19 -import static org.onlab.util.Tools.randomDelay;
20 import static org.slf4j.LoggerFactory.getLogger; 21 import static org.slf4j.LoggerFactory.getLogger;
21 22
22 /** 23 /**
...@@ -54,19 +55,10 @@ public class ConsistentIdBlockStore implements IdBlockStore { ...@@ -54,19 +55,10 @@ public class ConsistentIdBlockStore implements IdBlockStore {
54 name -> storageService.atomicCounterBuilder() 55 name -> storageService.atomicCounterBuilder()
55 .withName(name) 56 .withName(name)
56 .build()); 57 .build());
57 - Throwable exc = null; 58 + Long blockBase = Tools.retryable(counter::getAndAdd,
58 - for (int i = 0; i < MAX_TRIES; i++) { 59 + StorageException.class,
59 - try { 60 + MAX_TRIES,
60 - Long blockBase = counter.getAndAdd(DEFAULT_BLOCK_SIZE); 61 + RETRY_DELAY_MS).apply(DEFAULT_BLOCK_SIZE);
61 return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE); 62 return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE);
62 - } catch (StorageException e) {
63 - log.warn("Unable to allocate ID block due to {}; retrying...",
64 - e.getMessage());
65 - exc = e;
66 - randomDelay(RETRY_DELAY_MS); // FIXME: This is a deliberate hack; fix in Drake
67 - }
68 } 63 }
69 - throw new IllegalStateException("Unable to allocate ID block", exc);
70 - }
71 -
72 } 64 }
......
...@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.node.ShortNode; ...@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.node.ShortNode;
26 import com.fasterxml.jackson.databind.node.TextNode; 26 import com.fasterxml.jackson.databind.node.TextNode;
27 import com.google.common.collect.ImmutableSet; 27 import com.google.common.collect.ImmutableSet;
28 import com.google.common.collect.Maps; 28 import com.google.common.collect.Maps;
29 +
29 import org.apache.felix.scr.annotations.Activate; 30 import org.apache.felix.scr.annotations.Activate;
30 import org.apache.felix.scr.annotations.Component; 31 import org.apache.felix.scr.annotations.Component;
31 import org.apache.felix.scr.annotations.Deactivate; 32 import org.apache.felix.scr.annotations.Deactivate;
...@@ -165,14 +166,9 @@ public class DistributedNetworkConfigStore ...@@ -165,14 +166,9 @@ public class DistributedNetworkConfigStore
165 166
166 @Override 167 @Override
167 public <S, T extends Config<S>> T getConfig(S subject, Class<T> configClass) { 168 public <S, T extends Config<S>> T getConfig(S subject, Class<T> configClass) {
168 - // FIXME: There has to be a better way to absorb the timeout exceptions! 169 + // TODO: need to identify and address the root cause for timeouts.
169 - Versioned<ObjectNode> json = null; 170 + Versioned<ObjectNode> json = Tools.retryable(configs::get, ConsistentMapException.class, 1, MAX_BACKOFF)
170 - try { 171 + .apply(key(subject, configClass));
171 - json = configs.get(key(subject, configClass));
172 - } catch (ConsistentMapException e) {
173 - Tools.randomDelay(MAX_BACKOFF);
174 - json = configs.get(key(subject, configClass));
175 - }
176 return json != null ? createConfig(subject, configClass, json.value()) : null; 172 return json != null ? createConfig(subject, configClass, json.value()) : null;
177 } 173 }
178 174
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.util;
17 +
18 +import java.util.function.Function;
19 +
20 +import com.google.common.base.Throwables;
21 +
22 +/**
23 + * Function that retries execution on failure.
24 + *
25 + * @param <U> input type
26 + * @param <V> output type
27 + */
28 +public class RetryingFunction<U, V> implements Function<U, V> {
29 +
30 + private final Function<U, V> baseFunction;
31 + private final Class<? extends Throwable> exceptionClass;
32 + private final int maxRetries;
33 + private final int maxDelayBetweenRetries;
34 +
35 + public RetryingFunction(Function<U, V> baseFunction,
36 + Class<? extends Throwable> exceptionClass,
37 + int maxRetries,
38 + int maxDelayBetweenRetries) {
39 + this.baseFunction = baseFunction;
40 + this.exceptionClass = exceptionClass;
41 + this.maxRetries = maxRetries;
42 + this.maxDelayBetweenRetries = maxDelayBetweenRetries;
43 + }
44 +
45 + @Override
46 + public V apply(U input) {
47 + int retryAttempts = 0;
48 + while (true) {
49 + try {
50 + return baseFunction.apply(input);
51 + } catch (Throwable t) {
52 + if (!exceptionClass.isAssignableFrom(t.getClass()) || retryAttempts == maxRetries) {
53 + Throwables.propagate(t);
54 + }
55 + Tools.randomDelay(maxDelayBetweenRetries);
56 + retryAttempts++;
57 + }
58 + }
59 + }
60 +}
...@@ -46,6 +46,8 @@ import java.util.concurrent.Future; ...@@ -46,6 +46,8 @@ import java.util.concurrent.Future;
46 import java.util.concurrent.ThreadFactory; 46 import java.util.concurrent.ThreadFactory;
47 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException; 48 import java.util.concurrent.TimeoutException;
49 +import java.util.function.Function;
50 +import java.util.function.Supplier;
49 import java.util.stream.Stream; 51 import java.util.stream.Stream;
50 import java.util.stream.StreamSupport; 52 import java.util.stream.StreamSupport;
51 53
...@@ -227,6 +229,41 @@ public abstract class Tools { ...@@ -227,6 +229,41 @@ public abstract class Tools {
227 } 229 }
228 230
229 /** 231 /**
232 + * Returns a function that retries execution on failure.
233 + * @param base base function
234 + * @param exceptionClass type of exception for which to retry
235 + * @param maxRetries max number of retries before giving up
236 + * @param maxDelayBetweenRetries max delay between successive retries. The actual delay is randomly picked from
237 + * the interval (0, maxDelayBetweenRetries]
238 + * @return function
239 + */
240 + public static <U, V> Function<U, V> retryable(Function<U, V> base,
241 + Class<? extends Throwable> exceptionClass,
242 + int maxRetries,
243 + int maxDelayBetweenRetries) {
244 + return new RetryingFunction<>(base, exceptionClass, maxRetries, maxDelayBetweenRetries);
245 + }
246 +
247 + /**
248 + * Returns a Supplier that retries execution on failure.
249 + * @param base base supplier
250 + * @param exceptionClass type of exception for which to retry
251 + * @param maxRetries max number of retries before giving up
252 + * @param maxDelayBetweenRetries max delay between successive retries. The actual delay is randomly picked from
253 + * the interval (0, maxDelayBetweenRetries]
254 + * @return supplier
255 + */
256 + public static <V> Supplier<V> retryable(Supplier<V> base,
257 + Class<? extends Throwable> exceptionClass,
258 + int maxRetries,
259 + int maxDelayBetweenRetries) {
260 + return () -> new RetryingFunction<>(v -> base.get(),
261 + exceptionClass,
262 + maxRetries,
263 + maxDelayBetweenRetries).apply(null);
264 + }
265 +
266 + /**
230 * Suspends the current thread for a random number of millis between 0 and 267 * Suspends the current thread for a random number of millis between 0 and
231 * the indicated limit. 268 * the indicated limit.
232 * 269 *
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.util;
17 +
18 +import org.junit.After;
19 +import org.junit.Before;
20 +import org.junit.Test;
21 +
22 +/**
23 + * Unit tests for RetryingFunction.
24 + *
25 + */
26 +public class RetryingFunctionTest {
27 +
28 + private int round;
29 +
30 + @Before
31 + public void setUp() {
32 + round = 1;
33 + }
34 +
35 + @After
36 + public void tearDown() {
37 + round = 0;
38 + }
39 +
40 + @Test(expected = RetryableException.class)
41 + public void testNoRetries() {
42 + new RetryingFunction<>(this::succeedAfterOneFailure, RetryableException.class, 0, 10).apply(null);
43 + }
44 +
45 + @Test
46 + public void testSuccessAfterOneRetry() {
47 + new RetryingFunction<>(this::succeedAfterOneFailure, RetryableException.class, 1, 10).apply(null);
48 + }
49 +
50 + @Test(expected = RetryableException.class)
51 + public void testFailureAfterOneRetry() {
52 + new RetryingFunction<>(this::succeedAfterTwoFailures, RetryableException.class, 1, 10).apply(null);
53 + }
54 +
55 + @Test
56 + public void testFailureAfterTwoRetries() {
57 + new RetryingFunction<>(this::succeedAfterTwoFailures, RetryableException.class, 2, 10).apply(null);
58 + }
59 +
60 + @Test(expected = NonRetryableException.class)
61 + public void testFailureWithNonRetryableFailure() {
62 + new RetryingFunction<>(this::failCompletely, RetryableException.class, 2, 10).apply(null);
63 + }
64 +
65 + private String succeedAfterOneFailure(String input) {
66 + if (round++ <= 1) {
67 + throw new RetryableException();
68 + } else {
69 + return "pass";
70 + }
71 + }
72 +
73 + private String succeedAfterTwoFailures(String input) {
74 + if (round++ <= 2) {
75 + throw new RetryableException();
76 + } else {
77 + return "pass";
78 + }
79 + }
80 +
81 + private String failCompletely(String input) {
82 + if (round++ <= 1) {
83 + throw new NonRetryableException();
84 + } else {
85 + return "pass";
86 + }
87 + }
88 +
89 + private class RetryableException extends RuntimeException {
90 + }
91 +
92 + private class NonRetryableException extends RuntimeException {
93 + }
94 +}