Committed by
Gerrit Code Review
Support for a distributed counter
Change-Id: I346e9baa28556fac13e53771021f5f6fbcd75ac9
Showing
21 changed files
with
577 additions
and
29 deletions
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.cli.net; | ||
17 | + | ||
18 | +import java.util.Map; | ||
19 | + | ||
20 | +import org.apache.karaf.shell.commands.Command; | ||
21 | +import org.onosproject.cli.AbstractShellCommand; | ||
22 | +import org.onosproject.store.service.StorageAdminService; | ||
23 | + | ||
24 | +import com.fasterxml.jackson.databind.JsonNode; | ||
25 | +import com.fasterxml.jackson.databind.ObjectMapper; | ||
26 | +import com.fasterxml.jackson.databind.node.ArrayNode; | ||
27 | +import com.fasterxml.jackson.databind.node.ObjectNode; | ||
28 | + | ||
29 | +/** | ||
30 | + * Command to list the various counters in the system. | ||
31 | + */ | ||
32 | +@Command(scope = "onos", name = "counters", | ||
33 | + description = "Lists information about atomic counters in the system") | ||
34 | +public class CountersListCommand extends AbstractShellCommand { | ||
35 | + | ||
36 | + private static final String FMT = "name=%s next_value=%d"; | ||
37 | + | ||
38 | + /** | ||
39 | + * Displays counters as text. | ||
40 | + * | ||
41 | + * @param mapInfo map descriptions | ||
42 | + */ | ||
43 | + private void displayCounters(Map<String, Long> counters) { | ||
44 | + counters.forEach((name, nextValue) -> print(FMT, name, nextValue)); | ||
45 | + } | ||
46 | + | ||
47 | + /** | ||
48 | + * Converts info for counters into a JSON object. | ||
49 | + * | ||
50 | + * @param counters counter info | ||
51 | + */ | ||
52 | + private JsonNode json(Map<String, Long> counters) { | ||
53 | + ObjectMapper mapper = new ObjectMapper(); | ||
54 | + ArrayNode jsonCounters = mapper.createArrayNode(); | ||
55 | + | ||
56 | + // Create a JSON node for each counter | ||
57 | + counters.forEach((name, value) -> { | ||
58 | + ObjectNode jsonCounter = mapper.createObjectNode(); | ||
59 | + jsonCounter.put("name", name) | ||
60 | + .put("value", value); | ||
61 | + jsonCounters.add(jsonCounter); | ||
62 | + }); | ||
63 | + | ||
64 | + return jsonCounters; | ||
65 | + } | ||
66 | + | ||
67 | + @Override | ||
68 | + protected void execute() { | ||
69 | + StorageAdminService storageAdminService = get(StorageAdminService.class); | ||
70 | + Map<String, Long> counters = storageAdminService.getCounters(); | ||
71 | + if (outputJson()) { | ||
72 | + print("%s", json(counters)); | ||
73 | + } else { | ||
74 | + displayCounters(counters); | ||
75 | + } | ||
76 | + } | ||
77 | +} |
... | @@ -233,6 +233,9 @@ | ... | @@ -233,6 +233,9 @@ |
233 | <action class="org.onosproject.cli.net.MapsListCommand"/> | 233 | <action class="org.onosproject.cli.net.MapsListCommand"/> |
234 | </command> | 234 | </command> |
235 | <command> | 235 | <command> |
236 | + <action class="org.onosproject.cli.net.CountersListCommand"/> | ||
237 | + </command> | ||
238 | + <command> | ||
236 | <action class="org.onosproject.cli.net.TransactionsCommand"/> | 239 | <action class="org.onosproject.cli.net.TransactionsCommand"/> |
237 | </command> | 240 | </command> |
238 | <command> | 241 | <command> | ... | ... |
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.service; | ||
17 | + | ||
18 | +import java.util.concurrent.CompletableFuture; | ||
19 | + | ||
20 | +/** | ||
21 | + * An async atomic counter dispenses monotonically increasing values. | ||
22 | + */ | ||
23 | +public interface AsyncAtomicCounter { | ||
24 | + | ||
25 | + /** | ||
26 | + * Atomically increment by one the current value. | ||
27 | + * | ||
28 | + * @return updated value | ||
29 | + */ | ||
30 | + CompletableFuture<Long> incrementAndGet(); | ||
31 | + | ||
32 | + /** | ||
33 | + * Returns the current value of the counter without modifying it. | ||
34 | + * | ||
35 | + * @return current value | ||
36 | + */ | ||
37 | + CompletableFuture<Long> get(); | ||
38 | +} |
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.service; | ||
17 | + | ||
18 | +/** | ||
19 | + * An atomic counter dispenses monotonically increasing values. | ||
20 | + */ | ||
21 | +public interface AtomicCounter { | ||
22 | + | ||
23 | + /** | ||
24 | + * Atomically increment by one the current value. | ||
25 | + * | ||
26 | + * @return updated value | ||
27 | + */ | ||
28 | + long incrementAndGet(); | ||
29 | + | ||
30 | + /** | ||
31 | + * Returns the current value of the counter without modifying it. | ||
32 | + * | ||
33 | + * @return current value | ||
34 | + */ | ||
35 | + long get(); | ||
36 | +} |
1 | +package org.onosproject.store.service; | ||
2 | + | ||
3 | +/** | ||
4 | + * Builder for AtomicCounter. | ||
5 | + */ | ||
6 | +public interface AtomicCounterBuilder { | ||
7 | + | ||
8 | + /** | ||
9 | + * Sets the name for the atomic counter. | ||
10 | + * <p> | ||
11 | + * Each atomic counter is identified by a unique name. | ||
12 | + * </p> | ||
13 | + * <p> | ||
14 | + * Note: This is a mandatory parameter. | ||
15 | + * </p> | ||
16 | + * | ||
17 | + * @param name name of the atomic counter | ||
18 | + * @return this AtomicCounterBuilder | ||
19 | + */ | ||
20 | + public AtomicCounterBuilder withName(String name); | ||
21 | + | ||
22 | + /** | ||
23 | + * Creates this counter on the partition that spans the entire cluster. | ||
24 | + * <p> | ||
25 | + * When partitioning is disabled, the counter state will be | ||
26 | + * ephemeral and does not survive a full cluster restart. | ||
27 | + * </p> | ||
28 | + * <p> | ||
29 | + * Note: By default partitions are enabled. | ||
30 | + * </p> | ||
31 | + * @return this AtomicCounterBuilder | ||
32 | + */ | ||
33 | + public AtomicCounterBuilder withPartitionsDisabled(); | ||
34 | + | ||
35 | + /** | ||
36 | + * Builds a AtomicCounter based on the configuration options | ||
37 | + * supplied to this builder. | ||
38 | + * | ||
39 | + * @return new AtomicCounter | ||
40 | + * @throws java.lang.RuntimeException if a mandatory parameter is missing | ||
41 | + */ | ||
42 | + public AtomicCounter build(); | ||
43 | + | ||
44 | + /** | ||
45 | + * Builds a AsyncAtomicCounter based on the configuration options | ||
46 | + * supplied to this builder. | ||
47 | + * | ||
48 | + * @return new AsyncAtomicCounter | ||
49 | + * @throws java.lang.RuntimeException if a mandatory parameter is missing | ||
50 | + */ | ||
51 | + public AsyncAtomicCounter buildAsyncCounter(); | ||
52 | +} |
... | @@ -20,7 +20,7 @@ package org.onosproject.store.service; | ... | @@ -20,7 +20,7 @@ package org.onosproject.store.service; |
20 | * Top level exception for ConsistentMap failures. | 20 | * Top level exception for ConsistentMap failures. |
21 | */ | 21 | */ |
22 | @SuppressWarnings("serial") | 22 | @SuppressWarnings("serial") |
23 | -public class ConsistentMapException extends RuntimeException { | 23 | +public class ConsistentMapException extends StorageException { |
24 | public ConsistentMapException() { | 24 | public ConsistentMapException() { |
25 | } | 25 | } |
26 | 26 | ... | ... |
... | @@ -16,6 +16,8 @@ | ... | @@ -16,6 +16,8 @@ |
16 | 16 | ||
17 | package org.onosproject.store.service; | 17 | package org.onosproject.store.service; |
18 | 18 | ||
19 | +import org.onlab.util.KryoNamespace; | ||
20 | + | ||
19 | /** | 21 | /** |
20 | * Interface for serialization for store artifacts. | 22 | * Interface for serialization for store artifacts. |
21 | */ | 23 | */ |
... | @@ -35,4 +37,24 @@ public interface Serializer { | ... | @@ -35,4 +37,24 @@ public interface Serializer { |
35 | * @param <T> decoded type | 37 | * @param <T> decoded type |
36 | */ | 38 | */ |
37 | <T> T decode(byte[] bytes); | 39 | <T> T decode(byte[] bytes); |
40 | + | ||
41 | + /** | ||
42 | + * Creates a new Serializer instance from a KryoNamespace. | ||
43 | + * | ||
44 | + * @param kryo kryo namespace | ||
45 | + * @return Serializer instance | ||
46 | + */ | ||
47 | + public static Serializer using(KryoNamespace kryo) { | ||
48 | + return new Serializer() { | ||
49 | + @Override | ||
50 | + public <T> byte[] encode(T object) { | ||
51 | + return kryo.serialize(object); | ||
52 | + } | ||
53 | + | ||
54 | + @Override | ||
55 | + public <T> T decode(byte[] bytes) { | ||
56 | + return kryo.deserialize(bytes); | ||
57 | + } | ||
58 | + }; | ||
59 | + } | ||
38 | } | 60 | } |
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
... | @@ -17,6 +17,7 @@ package org.onosproject.store.service; | ... | @@ -17,6 +17,7 @@ package org.onosproject.store.service; |
17 | 17 | ||
18 | import java.util.Collection; | 18 | import java.util.Collection; |
19 | import java.util.List; | 19 | import java.util.List; |
20 | +import java.util.Map; | ||
20 | 21 | ||
21 | /** | 22 | /** |
22 | * Service for administering storage instances. | 23 | * Service for administering storage instances. |
... | @@ -38,6 +39,13 @@ public interface StorageAdminService { | ... | @@ -38,6 +39,13 @@ public interface StorageAdminService { |
38 | List<MapInfo> getMapInfo(); | 39 | List<MapInfo> getMapInfo(); |
39 | 40 | ||
40 | /** | 41 | /** |
42 | + * Returns information about all the atomic counters in the system. | ||
43 | + * | ||
44 | + * @return mapping from counter name to that counter's next value | ||
45 | + */ | ||
46 | + Map<String, Long> getCounters(); | ||
47 | + | ||
48 | + /** | ||
41 | * Returns all the transactions in the system. | 49 | * Returns all the transactions in the system. |
42 | * | 50 | * |
43 | * @return collection of transactions | 51 | * @return collection of transactions | ... | ... |
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | + | ||
17 | +package org.onosproject.store.service; | ||
18 | + | ||
19 | +/** | ||
20 | + * Top level exception for Store failures. | ||
21 | + */ | ||
22 | +@SuppressWarnings("serial") | ||
23 | +public class StorageException extends RuntimeException { | ||
24 | + public StorageException() { | ||
25 | + } | ||
26 | + | ||
27 | + public StorageException(Throwable t) { | ||
28 | + super(t); | ||
29 | + } | ||
30 | + | ||
31 | + /** | ||
32 | + * Store operation timeout. | ||
33 | + */ | ||
34 | + public static class Timeout extends StorageException { | ||
35 | + } | ||
36 | + | ||
37 | + /** | ||
38 | + * Store update conflicts with an in flight transaction. | ||
39 | + */ | ||
40 | + public static class ConcurrentModification extends StorageException { | ||
41 | + } | ||
42 | + | ||
43 | + /** | ||
44 | + * Store operation interrupted. | ||
45 | + */ | ||
46 | + public static class Interrupted extends StorageException { | ||
47 | + } | ||
48 | +} |
... | @@ -16,6 +16,7 @@ | ... | @@ -16,6 +16,7 @@ |
16 | 16 | ||
17 | package org.onosproject.store.service; | 17 | package org.onosproject.store.service; |
18 | 18 | ||
19 | + | ||
19 | /** | 20 | /** |
20 | * Storage service. | 21 | * Storage service. |
21 | * <p> | 22 | * <p> |
... | @@ -55,6 +56,13 @@ public interface StorageService { | ... | @@ -55,6 +56,13 @@ public interface StorageService { |
55 | <E> SetBuilder<E> setBuilder(); | 56 | <E> SetBuilder<E> setBuilder(); |
56 | 57 | ||
57 | /** | 58 | /** |
59 | + * Creates a new AtomicCounterBuilder. | ||
60 | + * | ||
61 | + * @return atomic counter builder | ||
62 | + */ | ||
63 | + AtomicCounterBuilder atomicCounterBuilder(); | ||
64 | + | ||
65 | + /** | ||
58 | * Creates a new transaction context. | 66 | * Creates a new transaction context. |
59 | * | 67 | * |
60 | * @return transaction context | 68 | * @return transaction context | ... | ... |
... | @@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl; | ... | @@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl; |
18 | 18 | ||
19 | import com.google.common.collect.ImmutableSet; | 19 | import com.google.common.collect.ImmutableSet; |
20 | import com.google.common.collect.Lists; | 20 | import com.google.common.collect.Lists; |
21 | +import com.google.common.collect.Maps; | ||
21 | import com.google.common.collect.Sets; | 22 | import com.google.common.collect.Sets; |
22 | 23 | ||
23 | import net.kuujo.copycat.CopycatConfig; | 24 | import net.kuujo.copycat.CopycatConfig; |
... | @@ -47,6 +48,7 @@ import org.onosproject.store.cluster.impl.DistributedClusterStore; | ... | @@ -47,6 +48,7 @@ import org.onosproject.store.cluster.impl.DistributedClusterStore; |
47 | import org.onosproject.store.cluster.impl.NodeInfo; | 48 | import org.onosproject.store.cluster.impl.NodeInfo; |
48 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; | 49 | import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
49 | import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; | 50 | import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; |
51 | +import org.onosproject.store.service.AtomicCounterBuilder; | ||
50 | import org.onosproject.store.service.ConsistentMapBuilder; | 52 | import org.onosproject.store.service.ConsistentMapBuilder; |
51 | import org.onosproject.store.service.ConsistentMapException; | 53 | import org.onosproject.store.service.ConsistentMapException; |
52 | import org.onosproject.store.service.EventuallyConsistentMapBuilder; | 54 | import org.onosproject.store.service.EventuallyConsistentMapBuilder; |
... | @@ -324,6 +326,11 @@ public class DatabaseManager implements StorageService, StorageAdminService { | ... | @@ -324,6 +326,11 @@ public class DatabaseManager implements StorageService, StorageAdminService { |
324 | } | 326 | } |
325 | 327 | ||
326 | @Override | 328 | @Override |
329 | + public AtomicCounterBuilder atomicCounterBuilder() { | ||
330 | + return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase); | ||
331 | + } | ||
332 | + | ||
333 | + @Override | ||
327 | public List<MapInfo> getMapInfo() { | 334 | public List<MapInfo> getMapInfo() { |
328 | List<MapInfo> maps = Lists.newArrayList(); | 335 | List<MapInfo> maps = Lists.newArrayList(); |
329 | maps.addAll(getMapInfo(inMemoryDatabase)); | 336 | maps.addAll(getMapInfo(inMemoryDatabase)); |
... | @@ -339,6 +346,15 @@ public class DatabaseManager implements StorageService, StorageAdminService { | ... | @@ -339,6 +346,15 @@ public class DatabaseManager implements StorageService, StorageAdminService { |
339 | .collect(Collectors.toList()); | 346 | .collect(Collectors.toList()); |
340 | } | 347 | } |
341 | 348 | ||
349 | + | ||
350 | + @Override | ||
351 | + public Map<String, Long> getCounters() { | ||
352 | + Map<String, Long> counters = Maps.newHashMap(); | ||
353 | + counters.putAll(complete(inMemoryDatabase.counters())); | ||
354 | + counters.putAll(complete(partitionedDatabase.counters())); | ||
355 | + return counters; | ||
356 | + } | ||
357 | + | ||
342 | @Override | 358 | @Override |
343 | public Collection<Transaction> getTransactions() { | 359 | public Collection<Transaction> getTransactions() { |
344 | return complete(transactionManager.getTransactions()); | 360 | return complete(transactionManager.getTransactions()); | ... | ... |
... | @@ -36,6 +36,12 @@ public interface DatabaseProxy<K, V> { | ... | @@ -36,6 +36,12 @@ public interface DatabaseProxy<K, V> { |
36 | CompletableFuture<Set<String>> tableNames(); | 36 | CompletableFuture<Set<String>> tableNames(); |
37 | 37 | ||
38 | /** | 38 | /** |
39 | + * Returns a mapping from counter name to next value. | ||
40 | + * @return A completable future to be completed with the result once complete. | ||
41 | + */ | ||
42 | + CompletableFuture<Map<String, Long>> counters(); | ||
43 | + | ||
44 | + /** | ||
39 | * Gets the table size. | 45 | * Gets the table size. |
40 | * | 46 | * |
41 | * @param tableName table name | 47 | * @param tableName table name |
... | @@ -182,6 +188,23 @@ public interface DatabaseProxy<K, V> { | ... | @@ -182,6 +188,23 @@ public interface DatabaseProxy<K, V> { |
182 | CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue); | 188 | CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue); |
183 | 189 | ||
184 | /** | 190 | /** |
191 | + * Returns the next value for the specified atomic counter after | ||
192 | + * incrementing the current value by one. | ||
193 | + * | ||
194 | + * @param counterName counter name | ||
195 | + * @return next value for the specified counter | ||
196 | + */ | ||
197 | + CompletableFuture<Long> nextValue(String counterName); | ||
198 | + | ||
199 | + /** | ||
200 | + * Returns the current value for the specified atomic counter. | ||
201 | + * | ||
202 | + * @param counterName counter name | ||
203 | + * @return current value for the specified counter | ||
204 | + */ | ||
205 | + CompletableFuture<Long> currentValue(String counterName); | ||
206 | + | ||
207 | + /** | ||
185 | * Prepare and commit the specified transaction. | 208 | * Prepare and commit the specified transaction. |
186 | * | 209 | * |
187 | * @param transaction transaction to commit (after preparation) | 210 | * @param transaction transaction to commit (after preparation) | ... | ... |
... | @@ -17,6 +17,7 @@ | ... | @@ -17,6 +17,7 @@ |
17 | package org.onosproject.store.consistent.impl; | 17 | package org.onosproject.store.consistent.impl; |
18 | 18 | ||
19 | import java.util.Collection; | 19 | import java.util.Collection; |
20 | +import java.util.Map; | ||
20 | import java.util.Map.Entry; | 21 | import java.util.Map.Entry; |
21 | import java.util.Set; | 22 | import java.util.Set; |
22 | 23 | ||
... | @@ -46,6 +47,9 @@ public interface DatabaseState<K, V> { | ... | @@ -46,6 +47,9 @@ public interface DatabaseState<K, V> { |
46 | Set<String> tableNames(); | 47 | Set<String> tableNames(); |
47 | 48 | ||
48 | @Query | 49 | @Query |
50 | + Map<String, Long> counters(); | ||
51 | + | ||
52 | + @Query | ||
49 | int size(String tableName); | 53 | int size(String tableName); |
50 | 54 | ||
51 | @Query | 55 | @Query |
... | @@ -94,6 +98,12 @@ public interface DatabaseState<K, V> { | ... | @@ -94,6 +98,12 @@ public interface DatabaseState<K, V> { |
94 | Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue); | 98 | Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue); |
95 | 99 | ||
96 | @Command | 100 | @Command |
101 | + Long nextValue(String counterName); | ||
102 | + | ||
103 | + @Query | ||
104 | + Long currentValue(String counterName); | ||
105 | + | ||
106 | + @Command | ||
97 | boolean prepareAndCommit(Transaction transaction); | 107 | boolean prepareAndCommit(Transaction transaction); |
98 | 108 | ||
99 | @Command | 109 | @Command | ... | ... |
core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncAtomicCounter.java
0 → 100644
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.consistent.impl; | ||
17 | + | ||
18 | +import java.util.concurrent.CompletableFuture; | ||
19 | +import org.onosproject.store.service.AsyncAtomicCounter; | ||
20 | +import static com.google.common.base.Preconditions.*; | ||
21 | + | ||
22 | +/** | ||
23 | + * Default implementation for a distributed AsyncAtomicCounter backed by | ||
24 | + * partitioned Raft DB. | ||
25 | + * <p> | ||
26 | + * The initial value will be zero. | ||
27 | + */ | ||
28 | +public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { | ||
29 | + | ||
30 | + private final String name; | ||
31 | + private final Database database; | ||
32 | + | ||
33 | + public DefaultAsyncAtomicCounter(String name, Database database) { | ||
34 | + this.name = checkNotNull(name); | ||
35 | + this.database = checkNotNull(database); | ||
36 | + } | ||
37 | + | ||
38 | + @Override | ||
39 | + public CompletableFuture<Long> incrementAndGet() { | ||
40 | + return database.nextValue(name); | ||
41 | + } | ||
42 | + | ||
43 | + @Override | ||
44 | + public CompletableFuture<Long> get() { | ||
45 | + return database.currentValue(name); | ||
46 | + } | ||
47 | +} |
core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounter.java
0 → 100644
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.consistent.impl; | ||
17 | + | ||
18 | +import java.util.concurrent.CompletableFuture; | ||
19 | +import java.util.concurrent.ExecutionException; | ||
20 | +import java.util.concurrent.TimeUnit; | ||
21 | +import java.util.concurrent.TimeoutException; | ||
22 | + | ||
23 | +import org.onosproject.store.service.AsyncAtomicCounter; | ||
24 | +import org.onosproject.store.service.AtomicCounter; | ||
25 | +import org.onosproject.store.service.StorageException; | ||
26 | + | ||
27 | +/** | ||
28 | + * Default implementation for a distributed AtomicCounter backed by | ||
29 | + * partitioned Raft DB. | ||
30 | + * <p> | ||
31 | + * The initial value will be zero. | ||
32 | + */ | ||
33 | +public class DefaultAtomicCounter implements AtomicCounter { | ||
34 | + | ||
35 | + private static final int OPERATION_TIMEOUT_MILLIS = 5000; | ||
36 | + | ||
37 | + private final AsyncAtomicCounter asyncCounter; | ||
38 | + | ||
39 | + public DefaultAtomicCounter(String name, Database database) { | ||
40 | + asyncCounter = new DefaultAsyncAtomicCounter(name, database); | ||
41 | + } | ||
42 | + | ||
43 | + @Override | ||
44 | + public long incrementAndGet() { | ||
45 | + return complete(asyncCounter.incrementAndGet()); | ||
46 | + } | ||
47 | + | ||
48 | + @Override | ||
49 | + public long get() { | ||
50 | + return complete(asyncCounter.get()); | ||
51 | + } | ||
52 | + | ||
53 | + private static <T> T complete(CompletableFuture<T> future) { | ||
54 | + try { | ||
55 | + return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); | ||
56 | + } catch (InterruptedException e) { | ||
57 | + Thread.currentThread().interrupt(); | ||
58 | + throw new StorageException.Interrupted(); | ||
59 | + } catch (TimeoutException e) { | ||
60 | + throw new StorageException.Timeout(); | ||
61 | + } catch (ExecutionException e) { | ||
62 | + throw new StorageException(e.getCause()); | ||
63 | + } | ||
64 | + } | ||
65 | +} |
core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAtomicCounterBuilder.java
0 → 100644
1 | +package org.onosproject.store.consistent.impl; | ||
2 | + | ||
3 | +import org.onosproject.store.service.AsyncAtomicCounter; | ||
4 | +import org.onosproject.store.service.AtomicCounter; | ||
5 | +import org.onosproject.store.service.AtomicCounterBuilder; | ||
6 | + | ||
7 | +import static com.google.common.base.Preconditions.checkArgument; | ||
8 | + | ||
9 | +/** | ||
10 | + * Default implementation of AtomicCounterBuilder. | ||
11 | + */ | ||
12 | +public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { | ||
13 | + | ||
14 | + private String name; | ||
15 | + private boolean partitionsEnabled = true; | ||
16 | + private final Database partitionedDatabase; | ||
17 | + private final Database inMemoryDatabase; | ||
18 | + | ||
19 | + public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) { | ||
20 | + this.inMemoryDatabase = inMemoryDatabase; | ||
21 | + this.partitionedDatabase = partitionedDatabase; | ||
22 | + } | ||
23 | + | ||
24 | + @Override | ||
25 | + public AtomicCounterBuilder withName(String name) { | ||
26 | + checkArgument(name != null && !name.isEmpty()); | ||
27 | + this.name = name; | ||
28 | + return this; | ||
29 | + } | ||
30 | + | ||
31 | + @Override | ||
32 | + public AtomicCounterBuilder withPartitionsDisabled() { | ||
33 | + partitionsEnabled = false; | ||
34 | + return this; | ||
35 | + } | ||
36 | + | ||
37 | + @Override | ||
38 | + public AtomicCounter build() { | ||
39 | + Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; | ||
40 | + return new DefaultAtomicCounter(name, database); | ||
41 | + } | ||
42 | + | ||
43 | + @Override | ||
44 | + public AsyncAtomicCounter buildAsyncCounter() { | ||
45 | + Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; | ||
46 | + return new DefaultAsyncAtomicCounter(name, database); | ||
47 | + } | ||
48 | +} |
... | @@ -65,6 +65,11 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab | ... | @@ -65,6 +65,11 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab |
65 | } | 65 | } |
66 | 66 | ||
67 | @Override | 67 | @Override |
68 | + public CompletableFuture<Map<String, Long>> counters() { | ||
69 | + return checkOpen(() -> proxy.counters()); | ||
70 | + } | ||
71 | + | ||
72 | + @Override | ||
68 | public CompletableFuture<Integer> size(String tableName) { | 73 | public CompletableFuture<Integer> size(String tableName) { |
69 | return checkOpen(() -> proxy.size(tableName)); | 74 | return checkOpen(() -> proxy.size(tableName)); |
70 | } | 75 | } |
... | @@ -145,6 +150,16 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab | ... | @@ -145,6 +150,16 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab |
145 | } | 150 | } |
146 | 151 | ||
147 | @Override | 152 | @Override |
153 | + public CompletableFuture<Long> nextValue(String counterName) { | ||
154 | + return checkOpen(() -> proxy.nextValue(counterName)); | ||
155 | + } | ||
156 | + | ||
157 | + @Override | ||
158 | + public CompletableFuture<Long> currentValue(String counterName) { | ||
159 | + return checkOpen(() -> proxy.currentValue(counterName)); | ||
160 | + } | ||
161 | + | ||
162 | + @Override | ||
148 | public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) { | 163 | public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) { |
149 | return checkOpen(() -> proxy.prepareAndCommit(transaction)); | 164 | return checkOpen(() -> proxy.prepareAndCommit(transaction)); |
150 | } | 165 | } | ... | ... |
... | @@ -21,6 +21,7 @@ import java.util.Collection; | ... | @@ -21,6 +21,7 @@ import java.util.Collection; |
21 | import java.util.HashSet; | 21 | import java.util.HashSet; |
22 | import java.util.Map; | 22 | import java.util.Map; |
23 | import java.util.Map.Entry; | 23 | import java.util.Map.Entry; |
24 | +import java.util.concurrent.atomic.AtomicLong; | ||
24 | import java.util.stream.Collectors; | 25 | import java.util.stream.Collectors; |
25 | import java.util.Set; | 26 | import java.util.Set; |
26 | 27 | ||
... | @@ -43,6 +44,7 @@ import net.kuujo.copycat.state.StateContext; | ... | @@ -43,6 +44,7 @@ import net.kuujo.copycat.state.StateContext; |
43 | */ | 44 | */ |
44 | public class DefaultDatabaseState implements DatabaseState<String, byte[]> { | 45 | public class DefaultDatabaseState implements DatabaseState<String, byte[]> { |
45 | private Long nextVersion; | 46 | private Long nextVersion; |
47 | + private Map<String, AtomicLong> counters; | ||
46 | private Map<String, Map<String, Versioned<byte[]>>> tables; | 48 | private Map<String, Map<String, Versioned<byte[]>>> tables; |
47 | 49 | ||
48 | /** | 50 | /** |
... | @@ -60,6 +62,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { | ... | @@ -60,6 +62,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { |
60 | @Initializer | 62 | @Initializer |
61 | @Override | 63 | @Override |
62 | public void init(StateContext<DatabaseState<String, byte[]>> context) { | 64 | public void init(StateContext<DatabaseState<String, byte[]>> context) { |
65 | + counters = context.get("counters"); | ||
66 | + if (counters == null) { | ||
67 | + counters = Maps.newConcurrentMap(); | ||
68 | + context.put("counters", counters); | ||
69 | + } | ||
63 | tables = context.get("tables"); | 70 | tables = context.get("tables"); |
64 | if (tables == null) { | 71 | if (tables == null) { |
65 | tables = Maps.newConcurrentMap(); | 72 | tables = Maps.newConcurrentMap(); |
... | @@ -83,6 +90,13 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { | ... | @@ -83,6 +90,13 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { |
83 | } | 90 | } |
84 | 91 | ||
85 | @Override | 92 | @Override |
93 | + public Map<String, Long> counters() { | ||
94 | + Map<String, Long> counterMap = Maps.newHashMap(); | ||
95 | + counters.forEach((k, v) -> counterMap.put(k, v.get())); | ||
96 | + return counterMap; | ||
97 | + } | ||
98 | + | ||
99 | + @Override | ||
86 | public int size(String tableName) { | 100 | public int size(String tableName) { |
87 | return getTableMap(tableName).size(); | 101 | return getTableMap(tableName).size(); |
88 | } | 102 | } |
... | @@ -212,6 +226,16 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { | ... | @@ -212,6 +226,16 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { |
212 | } | 226 | } |
213 | 227 | ||
214 | @Override | 228 | @Override |
229 | + public Long nextValue(String counterName) { | ||
230 | + return getCounter(counterName).incrementAndGet(); | ||
231 | + } | ||
232 | + | ||
233 | + @Override | ||
234 | + public Long currentValue(String counterName) { | ||
235 | + return getCounter(counterName).get(); | ||
236 | + } | ||
237 | + | ||
238 | + @Override | ||
215 | public boolean prepareAndCommit(Transaction transaction) { | 239 | public boolean prepareAndCommit(Transaction transaction) { |
216 | if (prepare(transaction)) { | 240 | if (prepare(transaction)) { |
217 | return commit(transaction); | 241 | return commit(transaction); |
... | @@ -255,6 +279,10 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { | ... | @@ -255,6 +279,10 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { |
255 | return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap()); | 279 | return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap()); |
256 | } | 280 | } |
257 | 281 | ||
282 | + private AtomicLong getCounter(String counterName) { | ||
283 | + return counters.computeIfAbsent(counterName, name -> new AtomicLong(0)); | ||
284 | + } | ||
285 | + | ||
258 | private boolean isUpdatePossible(DatabaseUpdate update) { | 286 | private boolean isUpdatePossible(DatabaseUpdate update) { |
259 | Versioned<byte[]> existingEntry = get(update.tableName(), update.key()); | 287 | Versioned<byte[]> existingEntry = get(update.tableName(), update.key()); |
260 | switch (update.type()) { | 288 | switch (update.type()) { | ... | ... |
... | @@ -103,18 +103,7 @@ public class DistributedLeadershipManager implements LeadershipService { | ... | @@ -103,18 +103,7 @@ public class DistributedLeadershipManager implements LeadershipService { |
103 | public void activate() { | 103 | public void activate() { |
104 | lockMap = storageService.<String, NodeId>consistentMapBuilder() | 104 | lockMap = storageService.<String, NodeId>consistentMapBuilder() |
105 | .withName("onos-leader-locks") | 105 | .withName("onos-leader-locks") |
106 | - .withSerializer(new Serializer() { | 106 | + .withSerializer(Serializer.using(new KryoNamespace.Builder().register(KryoNamespaces.API).build())) |
107 | - KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build(); | ||
108 | - @Override | ||
109 | - public <T> byte[] encode(T object) { | ||
110 | - return kryo.serialize(object); | ||
111 | - } | ||
112 | - | ||
113 | - @Override | ||
114 | - public <T> T decode(byte[] bytes) { | ||
115 | - return kryo.deserialize(bytes); | ||
116 | - } | ||
117 | - }) | ||
118 | .withPartitionsDisabled().build(); | 107 | .withPartitionsDisabled().build(); |
119 | 108 | ||
120 | localNodeId = clusterService.getLocalNode().id(); | 109 | localNodeId = clusterService.getLocalNode().id(); | ... | ... |
... | @@ -90,6 +90,21 @@ public class PartitionedDatabase implements Database { | ... | @@ -90,6 +90,21 @@ public class PartitionedDatabase implements Database { |
90 | } | 90 | } |
91 | 91 | ||
92 | @Override | 92 | @Override |
93 | + public CompletableFuture<Map<String, Long>> counters() { | ||
94 | + checkState(isOpen.get(), DB_NOT_OPEN); | ||
95 | + Map<String, Long> counters = Maps.newConcurrentMap(); | ||
96 | + return CompletableFuture.allOf(partitions | ||
97 | + .stream() | ||
98 | + .map(db -> db.counters() | ||
99 | + .thenApply(m -> { | ||
100 | + counters.putAll(m); | ||
101 | + return null; | ||
102 | + })) | ||
103 | + .toArray(CompletableFuture[]::new)) | ||
104 | + .thenApply(v -> counters); | ||
105 | + } | ||
106 | + | ||
107 | + @Override | ||
93 | public CompletableFuture<Integer> size(String tableName) { | 108 | public CompletableFuture<Integer> size(String tableName) { |
94 | checkState(isOpen.get(), DB_NOT_OPEN); | 109 | checkState(isOpen.get(), DB_NOT_OPEN); |
95 | AtomicInteger totalSize = new AtomicInteger(0); | 110 | AtomicInteger totalSize = new AtomicInteger(0); |
... | @@ -219,6 +234,18 @@ public class PartitionedDatabase implements Database { | ... | @@ -219,6 +234,18 @@ public class PartitionedDatabase implements Database { |
219 | } | 234 | } |
220 | 235 | ||
221 | @Override | 236 | @Override |
237 | + public CompletableFuture<Long> nextValue(String counterName) { | ||
238 | + checkState(isOpen.get(), DB_NOT_OPEN); | ||
239 | + return partitioner.getPartition(counterName, counterName).nextValue(counterName); | ||
240 | + } | ||
241 | + | ||
242 | + @Override | ||
243 | + public CompletableFuture<Long> currentValue(String counterName) { | ||
244 | + checkState(isOpen.get(), DB_NOT_OPEN); | ||
245 | + return partitioner.getPartition(counterName, counterName).currentValue(counterName); | ||
246 | + } | ||
247 | + | ||
248 | + @Override | ||
222 | public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) { | 249 | public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) { |
223 | Map<Database, Transaction> subTransactions = createSubTransactions(transaction); | 250 | Map<Database, Transaction> subTransactions = createSubTransactions(transaction); |
224 | if (subTransactions.isEmpty()) { | 251 | if (subTransactions.isEmpty()) { | ... | ... |
... | @@ -36,12 +36,7 @@ import org.onosproject.store.service.Transaction.State; | ... | @@ -36,12 +36,7 @@ import org.onosproject.store.service.Transaction.State; |
36 | */ | 36 | */ |
37 | public class TransactionManager { | 37 | public class TransactionManager { |
38 | 38 | ||
39 | - private final Database database; | 39 | + private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder() |
40 | - private final AsyncConsistentMap<Long, Transaction> transactions; | ||
41 | - | ||
42 | - private final Serializer serializer = new Serializer() { | ||
43 | - | ||
44 | - private KryoNamespace kryo = KryoNamespace.newBuilder() | ||
45 | .register(KryoNamespaces.BASIC) | 40 | .register(KryoNamespaces.BASIC) |
46 | .nextId(KryoNamespace.FLOATING_ID) | 41 | .nextId(KryoNamespace.FLOATING_ID) |
47 | .register(Versioned.class) | 42 | .register(Versioned.class) |
... | @@ -53,16 +48,9 @@ public class TransactionManager { | ... | @@ -53,16 +48,9 @@ public class TransactionManager { |
53 | .register(ImmutablePair.class) | 48 | .register(ImmutablePair.class) |
54 | .build(); | 49 | .build(); |
55 | 50 | ||
56 | - @Override | 51 | + private final Serializer serializer = Serializer.using(KRYO_NAMESPACE); |
57 | - public <T> byte[] encode(T object) { | 52 | + private final Database database; |
58 | - return kryo.serialize(object); | 53 | + private final AsyncConsistentMap<Long, Transaction> transactions; |
59 | - } | ||
60 | - | ||
61 | - @Override | ||
62 | - public <T> T decode(byte[] bytes) { | ||
63 | - return kryo.deserialize(bytes); | ||
64 | - } | ||
65 | - }; | ||
66 | 54 | ||
67 | /** | 55 | /** |
68 | * Constructs a new TransactionManager for the specified database instance. | 56 | * Constructs a new TransactionManager for the specified database instance. | ... | ... |
-
Please register or login to post a comment