Madan Jampani
Committed by Gerrit Code Review

Refactored code to consolidate functionality in Database* classes.

Renamed few methods and variables to align with local convention and also to match the description of functionality.

Change-Id: Ib17e73079534c76f76bcb01f14b6496e62275dbd
Showing 21 changed files with 394 additions and 724 deletions
...@@ -18,7 +18,6 @@ package org.onosproject.store.service; ...@@ -18,7 +18,6 @@ package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 -import java.util.Optional;
22 import java.util.Set; 21 import java.util.Set;
23 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.CompletableFuture;
24 import java.util.function.BiFunction; 23 import java.util.function.BiFunction;
...@@ -166,17 +165,6 @@ public interface AsyncConsistentMap<K, V> { ...@@ -166,17 +165,6 @@ public interface AsyncConsistentMap<K, V> {
166 CompletableFuture<Versioned<V>> putAndGet(K key, V value); 165 CompletableFuture<Versioned<V>> putAndGet(K key, V value);
167 166
168 /** 167 /**
169 - * Associates the specified value with the specified key in this map (optional operation).
170 - * If the map previously contained a mapping for the key, the old value is replaced by the
171 - * specified value.
172 - *
173 - * @param key key with which the specified value is to be associated
174 - * @param value value to be associated with the specified key
175 - * @return optional updated value. Will be empty if update did not happen
176 - */
177 - CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value);
178 -
179 - /**
180 * Removes the mapping for a key from this map if it is present (optional operation). 168 * Removes the mapping for a key from this map if it is present (optional operation).
181 * 169 *
182 * @param key key whose value is to be removed from the map 170 * @param key key whose value is to be removed from the map
...@@ -279,17 +267,6 @@ public interface AsyncConsistentMap<K, V> { ...@@ -279,17 +267,6 @@ public interface AsyncConsistentMap<K, V> {
279 CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue); 267 CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue);
280 268
281 /** 269 /**
282 - * Replaces the entry for the specified key only if it is currently mapped to the
283 - * specified version.
284 - *
285 - * @param key key key with which the specified value is associated
286 - * @param oldVersion version expected to be associated with the specified key
287 - * @param newValue value to be associated with the specified key
288 - * @return optional updated value. Will be empty if update did not happen.
289 - */
290 - CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
291 -
292 - /**
293 * Registers the specified listener to be notified whenever the map is updated. 270 * Registers the specified listener to be notified whenever the map is updated.
294 * 271 *
295 * @param listener listener to notify about map events 272 * @param listener listener to notify about map events
......
...@@ -18,7 +18,6 @@ package org.onosproject.store.service; ...@@ -18,7 +18,6 @@ package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 -import java.util.Optional;
22 import java.util.Set; 21 import java.util.Set;
23 import java.util.function.BiFunction; 22 import java.util.function.BiFunction;
24 import java.util.function.Function; 23 import java.util.function.Function;
...@@ -168,17 +167,6 @@ public interface ConsistentMap<K, V> { ...@@ -168,17 +167,6 @@ public interface ConsistentMap<K, V> {
168 Versioned<V> putAndGet(K key, V value); 167 Versioned<V> putAndGet(K key, V value);
169 168
170 /** 169 /**
171 - * Associates the specified value with the specified key in this map (optional operation).
172 - * If the map previously contained a mapping for the key, the old value is replaced by the
173 - * specified value.
174 - *
175 - * @param key key with which the specified value is to be associated
176 - * @param value value to be associated with the specified key
177 - * @return optional updated value. Will be empty if update did not happen
178 - */
179 - Optional<Versioned<V>> putIfAbsentAndGet(K key, V value);
180 -
181 - /**
182 * Removes the mapping for a key from this map if it is present (optional operation). 170 * Removes the mapping for a key from this map if it is present (optional operation).
183 * 171 *
184 * @param key key whose value is to be removed from the map 172 * @param key key whose value is to be removed from the map
...@@ -280,17 +268,6 @@ public interface ConsistentMap<K, V> { ...@@ -280,17 +268,6 @@ public interface ConsistentMap<K, V> {
280 boolean replace(K key, long oldVersion, V newValue); 268 boolean replace(K key, long oldVersion, V newValue);
281 269
282 /** 270 /**
283 - * Replaces the entry for the specified key only if it is currently mapped to the
284 - * specified version.
285 - *
286 - * @param key key key with which the specified value is associated
287 - * @param oldVersion version expected to be associated with the specified key
288 - * @param newValue value to be associated with the specified key
289 - * @return optional new value. Will be empty if replace did not happen
290 - */
291 - Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
292 -
293 - /**
294 * Registers the specified listener to be notified whenever the map is updated. 271 * Registers the specified listener to be notified whenever the map is updated.
295 * 272 *
296 * @param listener listener to notify about map events 273 * @param listener listener to notify about map events
......
...@@ -68,7 +68,7 @@ public final class DatabaseUpdate { ...@@ -68,7 +68,7 @@ public final class DatabaseUpdate {
68 } 68 }
69 69
70 private Type type; 70 private Type type;
71 - private String tableName; 71 + private String mapName;
72 private String key; 72 private String key;
73 private byte[] value; 73 private byte[] value;
74 private byte[] currentValue; 74 private byte[] currentValue;
...@@ -83,11 +83,11 @@ public final class DatabaseUpdate { ...@@ -83,11 +83,11 @@ public final class DatabaseUpdate {
83 } 83 }
84 84
85 /** 85 /**
86 - * Returns the tableName being updated. 86 + * Returns the name of map being updated.
87 - * @return table name. 87 + * @return map name.
88 */ 88 */
89 - public String tableName() { 89 + public String mapName() {
90 - return tableName; 90 + return mapName;
91 } 91 }
92 92
93 /** 93 /**
...@@ -126,7 +126,7 @@ public final class DatabaseUpdate { ...@@ -126,7 +126,7 @@ public final class DatabaseUpdate {
126 public String toString() { 126 public String toString() {
127 return MoreObjects.toStringHelper(this) 127 return MoreObjects.toStringHelper(this)
128 .add("type", type) 128 .add("type", type)
129 - .add("tableName", tableName) 129 + .add("mapName", mapName)
130 .add("key", key) 130 .add("key", key)
131 .add("value", value) 131 .add("value", value)
132 .add("currentValue", currentValue) 132 .add("currentValue", currentValue)
...@@ -161,8 +161,8 @@ public final class DatabaseUpdate { ...@@ -161,8 +161,8 @@ public final class DatabaseUpdate {
161 return this; 161 return this;
162 } 162 }
163 163
164 - public Builder withTableName(String tableName) { 164 + public Builder withMapName(String mapName) {
165 - update.tableName = checkNotNull(tableName, "tableName cannot be null"); 165 + update.mapName = checkNotNull(mapName, "mapName cannot be null");
166 return this; 166 return this;
167 } 167 }
168 168
...@@ -189,7 +189,7 @@ public final class DatabaseUpdate { ...@@ -189,7 +189,7 @@ public final class DatabaseUpdate {
189 189
190 private void validateInputs() { 190 private void validateInputs() {
191 checkNotNull(update.type, "type must be specified"); 191 checkNotNull(update.type, "type must be specified");
192 - checkNotNull(update.tableName, "table name must be specified"); 192 + checkNotNull(update.mapName, "map name must be specified");
193 checkNotNull(update.key, "key must be specified"); 193 checkNotNull(update.key, "key must be specified");
194 switch (update.type) { 194 switch (update.type) {
195 case PUT: 195 case PUT:
......
...@@ -33,7 +33,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -33,7 +33,7 @@ public class DatabaseUpdateTest extends TestCase {
33 .withValue("2".getBytes()) 33 .withValue("2".getBytes())
34 .withCurrentVersion(3) 34 .withCurrentVersion(3)
35 .withKey("4") 35 .withKey("4")
36 - .withTableName("5") 36 + .withMapName("5")
37 .withType(DatabaseUpdate.Type.PUT) 37 .withType(DatabaseUpdate.Type.PUT)
38 .build(); 38 .build();
39 39
...@@ -42,7 +42,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -42,7 +42,7 @@ public class DatabaseUpdateTest extends TestCase {
42 .withValue("2".getBytes()) 42 .withValue("2".getBytes())
43 .withCurrentVersion(3) 43 .withCurrentVersion(3)
44 .withKey("4") 44 .withKey("4")
45 - .withTableName("5") 45 + .withMapName("5")
46 .withType(DatabaseUpdate.Type.REMOVE) 46 .withType(DatabaseUpdate.Type.REMOVE)
47 .build(); 47 .build();
48 48
...@@ -51,7 +51,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -51,7 +51,7 @@ public class DatabaseUpdateTest extends TestCase {
51 .withValue("2".getBytes()) 51 .withValue("2".getBytes())
52 .withCurrentVersion(3) 52 .withCurrentVersion(3)
53 .withKey("4") 53 .withKey("4")
54 - .withTableName("5") 54 + .withMapName("5")
55 .withType(DatabaseUpdate.Type.REMOVE_IF_VALUE_MATCH) 55 .withType(DatabaseUpdate.Type.REMOVE_IF_VALUE_MATCH)
56 .build(); 56 .build();
57 57
...@@ -60,7 +60,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -60,7 +60,7 @@ public class DatabaseUpdateTest extends TestCase {
60 .withValue("2".getBytes()) 60 .withValue("2".getBytes())
61 .withCurrentVersion(3) 61 .withCurrentVersion(3)
62 .withKey("4") 62 .withKey("4")
63 - .withTableName("5") 63 + .withMapName("5")
64 .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH) 64 .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
65 .build(); 65 .build();
66 66
...@@ -69,7 +69,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -69,7 +69,7 @@ public class DatabaseUpdateTest extends TestCase {
69 .withValue("2".getBytes()) 69 .withValue("2".getBytes())
70 .withCurrentVersion(3) 70 .withCurrentVersion(3)
71 .withKey("4") 71 .withKey("4")
72 - .withTableName("5") 72 + .withMapName("5")
73 .withType(DatabaseUpdate.Type.PUT_IF_VALUE_MATCH) 73 .withType(DatabaseUpdate.Type.PUT_IF_VALUE_MATCH)
74 .build(); 74 .build();
75 75
...@@ -78,7 +78,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -78,7 +78,7 @@ public class DatabaseUpdateTest extends TestCase {
78 .withValue("2".getBytes()) 78 .withValue("2".getBytes())
79 .withCurrentVersion(3) 79 .withCurrentVersion(3)
80 .withKey("4") 80 .withKey("4")
81 - .withTableName("5") 81 + .withMapName("5")
82 .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH) 82 .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
83 .build(); 83 .build();
84 84
...@@ -91,7 +91,7 @@ public class DatabaseUpdateTest extends TestCase { ...@@ -91,7 +91,7 @@ public class DatabaseUpdateTest extends TestCase {
91 assertThat(stats1.value(), is("2".getBytes())); 91 assertThat(stats1.value(), is("2".getBytes()));
92 assertThat(stats1.currentVersion(), is(3L)); 92 assertThat(stats1.currentVersion(), is(3L));
93 assertThat(stats1.key(), is("4")); 93 assertThat(stats1.key(), is("4"));
94 - assertThat(stats1.tableName(), is("5")); 94 + assertThat(stats1.mapName(), is("5"));
95 assertThat(stats1.type(), is(DatabaseUpdate.Type.PUT)); 95 assertThat(stats1.type(), is(DatabaseUpdate.Type.PUT));
96 } 96 }
97 97
......
...@@ -391,9 +391,9 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -391,9 +391,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
391 } 391 }
392 392
393 private List<MapInfo> getMapInfo(Database database) { 393 private List<MapInfo> getMapInfo(Database database) {
394 - return complete(database.tableNames()) 394 + return complete(database.maps())
395 .stream() 395 .stream()
396 - .map(name -> new MapInfo(name, complete(database.size(name)))) 396 + .map(name -> new MapInfo(name, complete(database.mapSize(name))))
397 .filter(info -> info.size() > 0) 397 .filter(info -> info.size() > 0)
398 .collect(Collectors.toList()); 398 .collect(Collectors.toList());
399 } 399 }
......
...@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList; ...@@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableList;
24 import com.google.common.hash.Hashing; 24 import com.google.common.hash.Hashing;
25 25
26 /** 26 /**
27 - * Partitioner for mapping table entries to individual database partitions. 27 + * Partitioner for mapping map entries to individual database partitions.
28 * <p> 28 * <p>
29 - * By default a md5 hash of the hash key (key or table name) is used to pick a 29 + * By default a md5 hash of the hash key (key or map name) is used to pick a
30 * partition. 30 * partition.
31 */ 31 */
32 public abstract class DatabasePartitioner implements Partitioner<String> { 32 public abstract class DatabasePartitioner implements Partitioner<String> {
......
...@@ -31,11 +31,11 @@ import org.onosproject.store.service.Versioned; ...@@ -31,11 +31,11 @@ import org.onosproject.store.service.Versioned;
31 public interface DatabaseProxy<K, V> { 31 public interface DatabaseProxy<K, V> {
32 32
33 /** 33 /**
34 - * Returns a set of all tables names. 34 + * Returns a set of all map names.
35 * 35 *
36 * @return A completable future to be completed with the result once complete. 36 * @return A completable future to be completed with the result once complete.
37 */ 37 */
38 - CompletableFuture<Set<String>> tableNames(); 38 + CompletableFuture<Set<String>> maps();
39 39
40 /** 40 /**
41 * Returns a mapping from counter name to next value. 41 * Returns a mapping from counter name to next value.
...@@ -45,183 +45,91 @@ public interface DatabaseProxy<K, V> { ...@@ -45,183 +45,91 @@ public interface DatabaseProxy<K, V> {
45 CompletableFuture<Map<String, Long>> counters(); 45 CompletableFuture<Map<String, Long>> counters();
46 46
47 /** 47 /**
48 - * Gets the table size.
49 * 48 *
50 - * @param tableName table name 49 + * @param mapName map name
51 * @return A completable future to be completed with the result once complete. 50 * @return A completable future to be completed with the result once complete.
52 */ 51 */
53 - CompletableFuture<Integer> size(String tableName); 52 + CompletableFuture<Integer> mapSize(String mapName);
54 53
55 /** 54 /**
56 - * Checks whether the table is empty. 55 + * Checks whether the map is empty.
57 * 56 *
58 - * @param tableName table name 57 + * @param mapName map name
59 * @return A completable future to be completed with the result once complete. 58 * @return A completable future to be completed with the result once complete.
60 */ 59 */
61 - CompletableFuture<Boolean> isEmpty(String tableName); 60 + CompletableFuture<Boolean> mapIsEmpty(String mapName);
62 61
63 /** 62 /**
64 - * Checks whether the table contains a key. 63 + * Checks whether the map contains a key.
65 * 64 *
66 - * @param tableName table name 65 + * @param mapName map name
67 - * @param key The key to check. 66 + * @param key key to check.
68 * @return A completable future to be completed with the result once complete. 67 * @return A completable future to be completed with the result once complete.
69 */ 68 */
70 - CompletableFuture<Boolean> containsKey(String tableName, K key); 69 + CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
71 70
72 /** 71 /**
73 - * Checks whether the table contains a value. 72 + * Checks whether the map contains a value.
74 * 73 *
75 - * @param tableName table name 74 + * @param mapName map name
76 * @param value The value to check. 75 * @param value The value to check.
77 * @return A completable future to be completed with the result once complete. 76 * @return A completable future to be completed with the result once complete.
78 */ 77 */
79 - CompletableFuture<Boolean> containsValue(String tableName, V value); 78 + CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
80 79
81 /** 80 /**
82 - * Gets a value from the table. 81 + * Gets a value from the map.
83 * 82 *
84 - * @param tableName table name 83 + * @param mapName map name
85 * @param key The key to get. 84 * @param key The key to get.
86 * @return A completable future to be completed with the result once complete. 85 * @return A completable future to be completed with the result once complete.
87 */ 86 */
88 - CompletableFuture<Versioned<V>> get(String tableName, K key); 87 + CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
89 88
90 /** 89 /**
91 - * Puts a value in the table. 90 + * Updates the map.
92 * 91 *
93 - * @param tableName table name 92 + * @param mapName map name
94 - * @param key The key to set. 93 + * @param key The key to set
95 - * @param value The value to set. 94 + * @param valueMatch match for checking existing value
96 - * @return A completable future to be completed with the result once complete. 95 + * @param versionMatch match for checking existing version
97 - */ 96 + * @param value new value
98 - CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value); 97 + * @return A completable future to be completed with the result once complete
99 -
100 - /**
101 - * Puts a value in the table.
102 - *
103 - * @param tableName table name
104 - * @param key The key to set.
105 - * @param value The value to set.
106 - * @return A completable future to be completed with the result once complete.
107 - */
108 - CompletableFuture<Result<UpdateResult<Versioned<V>>>> putAndGet(String tableName, K key, V value);
109 -
110 - /**
111 - * Puts a value in the table.
112 - *
113 - * @param tableName table name
114 - * @param key The key to set.
115 - * @param value The value to set.
116 - * @return A completable future to be completed with the result once complete.
117 - */
118 - CompletableFuture<Result<UpdateResult<Versioned<V>>>> putIfAbsentAndGet(String tableName, K key, V value);
119 -
120 - /**
121 - * Removes a value from the table.
122 - *
123 - * @param tableName table name
124 - * @param key The key to remove.
125 - * @return A completable future to be completed with the result once complete.
126 - */
127 - CompletableFuture<Result<Versioned<V>>> remove(String tableName, K key);
128 -
129 - /**
130 - * Clears the table.
131 - *
132 - * @param tableName table name
133 - * @return A completable future to be completed with the result once complete.
134 - */
135 - CompletableFuture<Result<Void>> clear(String tableName);
136 -
137 - /**
138 - * Gets a set of keys in the table.
139 - *
140 - * @param tableName table name
141 - * @return A completable future to be completed with the result once complete.
142 - */
143 - CompletableFuture<Set<K>> keySet(String tableName);
144 -
145 - /**
146 - * Gets a collection of values in the table.
147 - *
148 - * @param tableName table name
149 - * @return A completable future to be completed with the result once complete.
150 - */
151 - CompletableFuture<Collection<Versioned<V>>> values(String tableName);
152 -
153 - /**
154 - * Gets a set of entries in the table.
155 - *
156 - * @param tableName table name
157 - * @return A completable future to be completed with the result once complete.
158 - */
159 - CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet(String tableName);
160 -
161 - /**
162 - * Puts a value in the table if the given key does not exist.
163 - *
164 - * @param tableName table name
165 - * @param key The key to set.
166 - * @param value The value to set if the given key does not exist.
167 - * @return A completable future to be completed with the result once complete.
168 - */
169 - CompletableFuture<Result<Versioned<V>>> putIfAbsent(String tableName, K key, V value);
170 -
171 - /**
172 - * Removes a key and if the existing value for that key matches the specified value.
173 - *
174 - * @param tableName table name
175 - * @param key The key to remove.
176 - * @param value The value to remove.
177 - * @return A completable future to be completed with the result once complete.
178 */ 98 */
179 - CompletableFuture<Result<Boolean>> remove(String tableName, K key, V value); 99 + CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
100 + String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value);
180 101
181 /** 102 /**
182 - * Removes a key and if the existing version for that key matches the specified version. 103 + * Clears the map.
183 * 104 *
184 - * @param tableName table name 105 + * @param mapName map name
185 - * @param key The key to remove.
186 - * @param version The expected version.
187 * @return A completable future to be completed with the result once complete. 106 * @return A completable future to be completed with the result once complete.
188 */ 107 */
189 - CompletableFuture<Result<Boolean>> remove(String tableName, K key, long version); 108 + CompletableFuture<Result<Void>> mapClear(String mapName);
190 109
191 /** 110 /**
192 - * Replaces the entry for the specified key only if currently mapped to the specified value. 111 + * Gets a set of keys in the map.
193 * 112 *
194 - * @param tableName table name 113 + * @param mapName map name
195 - * @param key The key to replace.
196 - * @param oldValue The value to replace.
197 - * @param newValue The value with which to replace the given key and value.
198 * @return A completable future to be completed with the result once complete. 114 * @return A completable future to be completed with the result once complete.
199 */ 115 */
200 - CompletableFuture<Result<Boolean>> replace(String tableName, K key, V oldValue, V newValue); 116 + CompletableFuture<Set<K>> mapKeySet(String mapName);
201 117
202 /** 118 /**
203 - * Replaces the entry for the specified key only if currently mapped to the specified version. 119 + * Gets a collection of values in the map.
204 * 120 *
205 - * @param tableName table name 121 + * @param mapName map name
206 - * @param key The key to update
207 - * @param oldVersion existing version in the map for this replace to succeed.
208 - * @param newValue The value with which to replace the given key and version.
209 * @return A completable future to be completed with the result once complete. 122 * @return A completable future to be completed with the result once complete.
210 */ 123 */
211 - CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue); 124 + CompletableFuture<Collection<Versioned<V>>> mapValues(String mapName);
212 125
213 /** 126 /**
214 - * Replaces the entry for the specified key only if currently mapped to the specified version. 127 + * Gets a set of entries in the map.
215 * 128 *
216 - * @param tableName table name 129 + * @param mapName map name
217 - * @param key The key to update
218 - * @param oldVersion existing version in the map for this replace to succeed.
219 - * @param newValue The value with which to replace the given key and version.
220 * @return A completable future to be completed with the result once complete. 130 * @return A completable future to be completed with the result once complete.
221 */ 131 */
222 - CompletableFuture<Result<UpdateResult<Versioned<V>>>> replaceAndGet(String tableName, 132 + CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
223 - K key, long oldVersion,
224 - V newValue);
225 133
226 /** 134 /**
227 * Atomically add the given value to current value of the specified counter. 135 * Atomically add the given value to current value of the specified counter.
......
...@@ -79,6 +79,7 @@ public class DatabaseSerializer extends SerializerConfig { ...@@ -79,6 +79,7 @@ public class DatabaseSerializer extends SerializerConfig {
79 .register(Result.Status.class) 79 .register(Result.Status.class)
80 .register(DefaultTransaction.class) 80 .register(DefaultTransaction.class)
81 .register(Transaction.State.class) 81 .register(Transaction.State.class)
82 + .register(Match.class)
82 .register(NodeId.class) 83 .register(NodeId.class)
83 .build(); 84 .build();
84 85
......
...@@ -45,67 +45,40 @@ public interface DatabaseState<K, V> { ...@@ -45,67 +45,40 @@ public interface DatabaseState<K, V> {
45 void init(StateContext<DatabaseState<K, V>> context); 45 void init(StateContext<DatabaseState<K, V>> context);
46 46
47 @Query 47 @Query
48 - Set<String> tableNames(); 48 + Set<String> maps();
49 49
50 @Query 50 @Query
51 Map<String, Long> counters(); 51 Map<String, Long> counters();
52 52
53 @Query 53 @Query
54 - int size(String tableName); 54 + int size(String mapName);
55 55
56 @Query 56 @Query
57 - boolean isEmpty(String tableName); 57 + boolean mapIsEmpty(String mapName);
58 58
59 @Query 59 @Query
60 - boolean containsKey(String tableName, K key); 60 + boolean mapContainsKey(String mapName, K key);
61 61
62 @Query 62 @Query
63 - boolean containsValue(String tableName, V value); 63 + boolean mapContainsValue(String mapName, V value);
64 64
65 @Query 65 @Query
66 - Versioned<V> get(String tableName, K key); 66 + Versioned<V> mapGet(String mapName, K key);
67 67
68 @Command 68 @Command
69 - Result<Versioned<V>> put(String tableName, K key, V value); 69 + Result<UpdateResult<K, V>> mapUpdate(String mapName, K key, Match<V> valueMatch, Match<Long> versionMatch, V value);
70 70
71 @Command 71 @Command
72 - Result<UpdateResult<Versioned<V>>> putAndGet(String tableName, K key, V value); 72 + Result<Void> mapClear(String mapName);
73 -
74 - @Command
75 - Result<UpdateResult<Versioned<V>>> putIfAbsentAndGet(String tableName, K key, V value);
76 -
77 - @Command
78 - Result<Versioned<V>> remove(String tableName, K key);
79 -
80 - @Command
81 - Result<Void> clear(String tableName);
82 73
83 @Query 74 @Query
84 - Set<K> keySet(String tableName); 75 + Set<K> mapKeySet(String mapName);
85 76
86 @Query 77 @Query
87 - Collection<Versioned<V>> values(String tableName); 78 + Collection<Versioned<V>> mapValues(String mapName);
88 79
89 @Query 80 @Query
90 - Set<Entry<K, Versioned<V>>> entrySet(String tableName); 81 + Set<Entry<K, Versioned<V>>> mapEntrySet(String mapName);
91 -
92 - @Command
93 - Result<Versioned<V>> putIfAbsent(String tableName, K key, V value);
94 -
95 - @Command
96 - Result<Boolean> remove(String tableName, K key, V value);
97 -
98 - @Command
99 - Result<Boolean> remove(String tableName, K key, long version);
100 -
101 - @Command
102 - Result<Boolean> replace(String tableName, K key, V oldValue, V newValue);
103 -
104 - @Command
105 - Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
106 -
107 - @Command
108 - Result<UpdateResult<Versioned<V>>> replaceAndGet(String tableName, K key, long oldVersion, V newValue);
109 82
110 @Command 83 @Command
111 Long counterAddAndGet(String counterName, long delta); 84 Long counterAddAndGet(String counterName, long delta);
......
...@@ -23,7 +23,6 @@ import java.util.Collection; ...@@ -23,7 +23,6 @@ import java.util.Collection;
23 import java.util.Map; 23 import java.util.Map;
24 import java.util.Map.Entry; 24 import java.util.Map.Entry;
25 import java.util.Objects; 25 import java.util.Objects;
26 -import java.util.Optional;
27 import java.util.concurrent.CompletableFuture; 26 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CopyOnWriteArraySet; 27 import java.util.concurrent.CopyOnWriteArraySet;
29 import java.util.concurrent.atomic.AtomicReference; 28 import java.util.concurrent.atomic.AtomicReference;
...@@ -34,7 +33,6 @@ import java.util.function.Predicate; ...@@ -34,7 +33,6 @@ import java.util.function.Predicate;
34 import java.util.stream.Collectors; 33 import java.util.stream.Collectors;
35 import java.util.Set; 34 import java.util.Set;
36 35
37 -import org.apache.commons.lang3.tuple.Pair;
38 import org.onlab.util.HexString; 36 import org.onlab.util.HexString;
39 import org.onlab.util.Tools; 37 import org.onlab.util.Tools;
40 import org.onosproject.core.ApplicationId; 38 import org.onosproject.core.ApplicationId;
...@@ -49,6 +47,7 @@ import org.slf4j.Logger; ...@@ -49,6 +47,7 @@ import org.slf4j.Logger;
49 import com.google.common.cache.CacheBuilder; 47 import com.google.common.cache.CacheBuilder;
50 import com.google.common.cache.CacheLoader; 48 import com.google.common.cache.CacheLoader;
51 import com.google.common.cache.LoadingCache; 49 import com.google.common.cache.LoadingCache;
50 +import com.google.common.collect.Maps;
52 51
53 /** 52 /**
54 * AsyncConsistentMap implementation that is backed by a Raft consensus 53 * AsyncConsistentMap implementation that is backed by a Raft consensus
...@@ -139,38 +138,39 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -139,38 +138,39 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
139 138
140 @Override 139 @Override
141 public CompletableFuture<Integer> size() { 140 public CompletableFuture<Integer> size() {
142 - return database.size(name); 141 + return database.mapSize(name);
143 } 142 }
144 143
145 @Override 144 @Override
146 public CompletableFuture<Boolean> isEmpty() { 145 public CompletableFuture<Boolean> isEmpty() {
147 - return database.isEmpty(name); 146 + return database.mapIsEmpty(name);
148 } 147 }
149 148
150 @Override 149 @Override
151 public CompletableFuture<Boolean> containsKey(K key) { 150 public CompletableFuture<Boolean> containsKey(K key) {
152 checkNotNull(key, ERROR_NULL_KEY); 151 checkNotNull(key, ERROR_NULL_KEY);
153 - return database.containsKey(name, keyCache.getUnchecked(key)); 152 + return database.mapContainsKey(name, keyCache.getUnchecked(key));
154 } 153 }
155 154
156 @Override 155 @Override
157 public CompletableFuture<Boolean> containsValue(V value) { 156 public CompletableFuture<Boolean> containsValue(V value) {
158 checkNotNull(value, ERROR_NULL_VALUE); 157 checkNotNull(value, ERROR_NULL_VALUE);
159 - return database.containsValue(name, serializer.encode(value)); 158 + return database.mapContainsValue(name, serializer.encode(value));
160 } 159 }
161 160
162 @Override 161 @Override
163 public CompletableFuture<Versioned<V>> get(K key) { 162 public CompletableFuture<Versioned<V>> get(K key) {
164 checkNotNull(key, ERROR_NULL_KEY); 163 checkNotNull(key, ERROR_NULL_KEY);
165 - return database.get(name, keyCache.getUnchecked(key)) 164 + return database.mapGet(name, keyCache.getUnchecked(key))
166 - .thenApply(v -> v != null 165 + .thenApply(v -> v != null ? v.map(serializer::decode) : null);
167 - ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
168 } 166 }
169 167
170 @Override 168 @Override
171 public CompletableFuture<Versioned<V>> computeIfAbsent(K key, 169 public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
172 Function<? super K, ? extends V> mappingFunction) { 170 Function<? super K, ? extends V> mappingFunction) {
173 - return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k)); 171 + checkNotNull(key, ERROR_NULL_KEY);
172 + checkNotNull(mappingFunction, "Mapping function cannot be null");
173 + return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)).thenApply(v -> v.newValue());
174 } 174 }
175 175
176 @Override 176 @Override
...@@ -192,7 +192,6 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -192,7 +192,6 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
192 checkNotNull(key, ERROR_NULL_KEY); 192 checkNotNull(key, ERROR_NULL_KEY);
193 checkNotNull(condition, "predicate function cannot be null"); 193 checkNotNull(condition, "predicate function cannot be null");
194 checkNotNull(remappingFunction, "Remapping function cannot be null"); 194 checkNotNull(remappingFunction, "Remapping function cannot be null");
195 - AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
196 return get(key).thenCompose(r1 -> { 195 return get(key).thenCompose(r1 -> {
197 V existingValue = r1 == null ? null : r1.value(); 196 V existingValue = r1 == null ? null : r1.value();
198 // if the condition evaluates to false, return existing value. 197 // if the condition evaluates to false, return existing value.
...@@ -207,116 +206,51 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -207,116 +206,51 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
207 } catch (Exception e) { 206 } catch (Exception e) {
208 return Tools.exceptionalFuture(e); 207 return Tools.exceptionalFuture(e);
209 } 208 }
210 - 209 + if (computedValue.get() == null && r1 == null) {
211 - // if the computed value is null, remove current value if one exists.
212 - // throw an exception if concurrent modification is detected.
213 - if (computedValue.get() == null) {
214 - if (r1 != null) {
215 - return remove(key, r1.version()).thenApply(result -> {
216 - if (result) {
217 - mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
218 - return null;
219 - } else {
220 - throw new ConsistentMapException.ConcurrentModification();
221 - }
222 - });
223 - } else {
224 return CompletableFuture.completedFuture(null); 210 return CompletableFuture.completedFuture(null);
225 } 211 }
226 - } else { 212 + Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
227 - // replace current value; throw an exception if concurrent modification is detected 213 + Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
228 - if (r1 != null) { 214 + return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
229 - return replaceAndGet(key, r1.version(), computedValue.get())
230 .thenApply(v -> { 215 .thenApply(v -> {
231 - if (v.isPresent()) { 216 + if (v.updated()) {
232 - mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get())); 217 + return v.newValue();
233 - return v.get();
234 } else { 218 } else {
235 throw new ConsistentMapException.ConcurrentModification(); 219 throw new ConsistentMapException.ConcurrentModification();
236 } 220 }
237 }); 221 });
238 - } else {
239 - return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> {
240 - if (!result.isPresent()) {
241 - throw new ConsistentMapException.ConcurrentModification();
242 - } else {
243 - mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
244 - return result.get();
245 - }
246 }); 222 });
247 } 223 }
248 - }
249 - }).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
250 - }
251 224
252 @Override 225 @Override
253 public CompletableFuture<Versioned<V>> put(K key, V value) { 226 public CompletableFuture<Versioned<V>> put(K key, V value) {
254 checkNotNull(key, ERROR_NULL_KEY); 227 checkNotNull(key, ERROR_NULL_KEY);
255 checkNotNull(value, ERROR_NULL_VALUE); 228 checkNotNull(value, ERROR_NULL_VALUE);
256 - checkIfUnmodifiable(); 229 + return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue());
257 - return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
258 - .thenApply(this::unwrapResult)
259 - .thenApply(v -> v != null
260 - ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
261 } 230 }
262 231
263 @Override 232 @Override
264 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) { 233 public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
265 checkNotNull(key, ERROR_NULL_KEY); 234 checkNotNull(key, ERROR_NULL_KEY);
266 checkNotNull(value, ERROR_NULL_VALUE); 235 checkNotNull(value, ERROR_NULL_VALUE);
267 - checkIfUnmodifiable(); 236 + return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue());
268 - return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
269 - .thenApply(this::unwrapResult)
270 - .thenApply(v -> {
271 - Versioned<byte[]> rawNewValue = v.newValue();
272 - return new Versioned<>(serializer.decode(rawNewValue.value()),
273 - rawNewValue.version(),
274 - rawNewValue.creationTime());
275 - });
276 - }
277 -
278 - @Override
279 - public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) {
280 - checkNotNull(key, ERROR_NULL_KEY);
281 - checkNotNull(value, ERROR_NULL_VALUE);
282 - checkIfUnmodifiable();
283 - return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
284 - .thenApply(this::unwrapResult)
285 - .thenApply(v -> {
286 - if (v.updated()) {
287 - Versioned<byte[]> rawNewValue = v.newValue();
288 - return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
289 - rawNewValue.version(),
290 - rawNewValue.creationTime()));
291 - } else {
292 - return Optional.empty();
293 - }
294 - });
295 } 237 }
296 238
297 @Override 239 @Override
298 public CompletableFuture<Versioned<V>> remove(K key) { 240 public CompletableFuture<Versioned<V>> remove(K key) {
299 checkNotNull(key, ERROR_NULL_KEY); 241 checkNotNull(key, ERROR_NULL_KEY);
300 - checkIfUnmodifiable(); 242 + return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue());
301 - return database.remove(name, keyCache.getUnchecked(key))
302 - .thenApply(this::unwrapResult)
303 - .thenApply(v -> v != null ? v.<V>map(serializer::decode) : null)
304 - .whenComplete((r, e) -> {
305 - if (r != null) {
306 - notifyListeners(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r));
307 - }
308 - });
309 } 243 }
310 244
311 @Override 245 @Override
312 public CompletableFuture<Void> clear() { 246 public CompletableFuture<Void> clear() {
313 checkIfUnmodifiable(); 247 checkIfUnmodifiable();
314 - return database.clear(name).thenApply(this::unwrapResult); 248 + return database.mapClear(name).thenApply(this::unwrapResult);
315 } 249 }
316 250
317 @Override 251 @Override
318 public CompletableFuture<Set<K>> keySet() { 252 public CompletableFuture<Set<K>> keySet() {
319 - return database.keySet(name) 253 + return database.mapKeySet(name)
320 .thenApply(s -> s 254 .thenApply(s -> s
321 .stream() 255 .stream()
322 .map(this::dK) 256 .map(this::dK)
...@@ -325,17 +259,17 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -325,17 +259,17 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
325 259
326 @Override 260 @Override
327 public CompletableFuture<Collection<Versioned<V>>> values() { 261 public CompletableFuture<Collection<Versioned<V>>> values() {
328 - return database.values(name).thenApply(c -> c 262 + return database.mapValues(name).thenApply(c -> c
329 .stream() 263 .stream()
330 - .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime())) 264 + .map(v -> v.<V>map(serializer::decode))
331 .collect(Collectors.toList())); 265 .collect(Collectors.toList()));
332 } 266 }
333 267
334 @Override 268 @Override
335 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() { 269 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
336 - return database.entrySet(name).thenApply(s -> s 270 + return database.mapEntrySet(name).thenApply(s -> s
337 .stream() 271 .stream()
338 - .map(this::fromRawEntry) 272 + .map(this::mapRawEntry)
339 .collect(Collectors.toSet())); 273 .collect(Collectors.toSet()));
340 } 274 }
341 275
...@@ -343,84 +277,52 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -343,84 +277,52 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
343 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) { 277 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
344 checkNotNull(key, ERROR_NULL_KEY); 278 checkNotNull(key, ERROR_NULL_KEY);
345 checkNotNull(value, ERROR_NULL_VALUE); 279 checkNotNull(value, ERROR_NULL_VALUE);
346 - checkIfUnmodifiable(); 280 + return computeIfAbsent(key, k -> value);
347 - AtomicReference<MapEvent<K, V>> event = new AtomicReference<>();
348 - return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
349 - .thenApply(this::unwrapResult)
350 - .whenComplete((r, e) -> {
351 - if (r != null && r.updated()) {
352 - event.set(new MapEvent<K, V>(name,
353 - MapEvent.Type.INSERT,
354 - key,
355 - r.newValue().<V>map(serializer::decode)));
356 - }
357 - })
358 - .thenApply(v -> v.updated() ? null : v.oldValue().<V>map(serializer::decode))
359 - .whenComplete((r, e) -> notifyListeners(event.get()));
360 } 281 }
361 282
362 @Override 283 @Override
363 public CompletableFuture<Boolean> remove(K key, V value) { 284 public CompletableFuture<Boolean> remove(K key, V value) {
364 checkNotNull(key, ERROR_NULL_KEY); 285 checkNotNull(key, ERROR_NULL_KEY);
365 checkNotNull(value, ERROR_NULL_VALUE); 286 checkNotNull(value, ERROR_NULL_VALUE);
366 - checkIfUnmodifiable(); 287 + return updateAndGet(key, Match.ifValue(value), Match.any(), null).thenApply(v -> v.updated());
367 - return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
368 - .thenApply(this::unwrapResult);
369 } 288 }
370 289
371 @Override 290 @Override
372 public CompletableFuture<Boolean> remove(K key, long version) { 291 public CompletableFuture<Boolean> remove(K key, long version) {
373 checkNotNull(key, ERROR_NULL_KEY); 292 checkNotNull(key, ERROR_NULL_KEY);
374 - checkIfUnmodifiable(); 293 + return updateAndGet(key, Match.any(), Match.ifValue(version), null).thenApply(v -> v.updated());
375 - return database.remove(name, keyCache.getUnchecked(key), version)
376 - .thenApply(this::unwrapResult);
377 -
378 } 294 }
379 295
380 @Override 296 @Override
381 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) { 297 public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
382 checkNotNull(key, ERROR_NULL_KEY); 298 checkNotNull(key, ERROR_NULL_KEY);
299 + checkNotNull(oldValue, ERROR_NULL_VALUE);
383 checkNotNull(newValue, ERROR_NULL_VALUE); 300 checkNotNull(newValue, ERROR_NULL_VALUE);
384 - checkIfUnmodifiable(); 301 + return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue).thenApply(v -> v.updated());
385 - byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
386 - return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
387 - .thenApply(this::unwrapResult);
388 } 302 }
389 303
390 @Override 304 @Override
391 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) { 305 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
392 - return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent); 306 + return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue).thenApply(v -> v.updated());
393 } 307 }
394 308
395 - @Override 309 + private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
396 - public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) { 310 + return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
397 - checkNotNull(key, ERROR_NULL_KEY); 311 + }
398 - checkNotNull(newValue, ERROR_NULL_VALUE); 312 +
313 + private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
314 + Match<V> oldValueMatch,
315 + Match<Long> oldVersionMatch,
316 + V value) {
399 checkIfUnmodifiable(); 317 checkIfUnmodifiable();
400 - return database.replaceAndGet(name, 318 + return database.mapUpdate(name,
401 keyCache.getUnchecked(key), 319 keyCache.getUnchecked(key),
402 - oldVersion, 320 + oldValueMatch.map(serializer::encode),
403 - serializer.encode(newValue)) 321 + oldVersionMatch,
322 + value == null ? null : serializer.encode(value))
404 .thenApply(this::unwrapResult) 323 .thenApply(this::unwrapResult)
405 - .thenApply(v -> { 324 + .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
406 - if (v.updated()) { 325 + .whenComplete((r, e) -> notifyListeners(r != null ? r.toMapEvent() : null));
407 - Versioned<byte[]> rawNewValue = v.newValue();
408 - return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
409 - rawNewValue.version(),
410 - rawNewValue.creationTime()));
411 - } else {
412 - return Optional.empty();
413 - }
414 - });
415 - }
416 -
417 - private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
418 - return Pair.of(
419 - dK(e.getKey()),
420 - new Versioned<>(
421 - serializer.decode(e.getValue().value()),
422 - e.getValue().version(),
423 - e.getValue().creationTime()));
424 } 326 }
425 327
426 private <T> T unwrapResult(Result<T> result) { 328 private <T> T unwrapResult(Result<T> result) {
......
...@@ -18,7 +18,6 @@ package org.onosproject.store.consistent.impl; ...@@ -18,7 +18,6 @@ package org.onosproject.store.consistent.impl;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 -import java.util.Optional;
22 import java.util.concurrent.CompletableFuture; 21 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.ExecutionException; 22 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit; 23 import java.util.concurrent.TimeUnit;
...@@ -115,11 +114,6 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -115,11 +114,6 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
115 } 114 }
116 115
117 @Override 116 @Override
118 - public Optional<Versioned<V>> putIfAbsentAndGet(K key, V value) {
119 - return complete(asyncMap.putIfAbsentAndGet(key, value));
120 - }
121 -
122 - @Override
123 public Versioned<V> remove(K key) { 117 public Versioned<V> remove(K key) {
124 return complete(asyncMap.remove(key)); 118 return complete(asyncMap.remove(key));
125 } 119 }
...@@ -169,11 +163,6 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -169,11 +163,6 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
169 return complete(asyncMap.replace(key, oldVersion, newValue)); 163 return complete(asyncMap.replace(key, oldVersion, newValue));
170 } 164 }
171 165
172 - @Override
173 - public Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue) {
174 - return complete(asyncMap.replaceAndGet(key, oldVersion, newValue));
175 - }
176 -
177 private static <T> T complete(CompletableFuture<T> future) { 166 private static <T> T complete(CompletableFuture<T> future) {
178 try { 167 try {
179 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); 168 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
......
...@@ -64,8 +64,8 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -64,8 +64,8 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
64 } 64 }
65 65
66 @Override 66 @Override
67 - public CompletableFuture<Set<String>> tableNames() { 67 + public CompletableFuture<Set<String>> maps() {
68 - return checkOpen(() -> proxy.tableNames()); 68 + return checkOpen(() -> proxy.maps());
69 } 69 }
70 70
71 @Override 71 @Override
...@@ -74,105 +74,54 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -74,105 +74,54 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
74 } 74 }
75 75
76 @Override 76 @Override
77 - public CompletableFuture<Integer> size(String tableName) { 77 + public CompletableFuture<Integer> mapSize(String mapName) {
78 - return checkOpen(() -> proxy.size(tableName)); 78 + return checkOpen(() -> proxy.mapSize(mapName));
79 } 79 }
80 80
81 @Override 81 @Override
82 - public CompletableFuture<Boolean> isEmpty(String tableName) { 82 + public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
83 - return checkOpen(() -> proxy.isEmpty(tableName)); 83 + return checkOpen(() -> proxy.mapIsEmpty(mapName));
84 } 84 }
85 85
86 @Override 86 @Override
87 - public CompletableFuture<Boolean> containsKey(String tableName, String key) { 87 + public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
88 - return checkOpen(() -> proxy.containsKey(tableName, key)); 88 + return checkOpen(() -> proxy.mapContainsKey(mapName, key));
89 } 89 }
90 90
91 @Override 91 @Override
92 - public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) { 92 + public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
93 - return checkOpen(() -> proxy.containsValue(tableName, value)); 93 + return checkOpen(() -> proxy.mapContainsValue(mapName, value));
94 } 94 }
95 95
96 @Override 96 @Override
97 - public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) { 97 + public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
98 - return checkOpen(() -> proxy.get(tableName, key)); 98 + return checkOpen(() -> proxy.mapGet(mapName, key));
99 } 99 }
100 100
101 @Override 101 @Override
102 - public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) { 102 + public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
103 - return checkOpen(() -> proxy.put(tableName, key, value)); 103 + String mapName, String key, Match<byte[]> valueMatch, Match<Long> versionMatch, byte[] value) {
104 + return checkOpen(() -> proxy.mapUpdate(mapName, key, valueMatch, versionMatch, value));
104 } 105 }
105 106
106 @Override 107 @Override
107 - public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName, 108 + public CompletableFuture<Result<Void>> mapClear(String mapName) {
108 - String key, 109 + return checkOpen(() -> proxy.mapClear(mapName));
109 - byte[] value) {
110 - return checkOpen(() -> proxy.putAndGet(tableName, key, value));
111 } 110 }
112 111
113 @Override 112 @Override
114 - public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName, 113 + public CompletableFuture<Set<String>> mapKeySet(String mapName) {
115 - String key, 114 + return checkOpen(() -> proxy.mapKeySet(mapName));
116 - byte[] value) {
117 - return checkOpen(() -> proxy.putIfAbsentAndGet(tableName, key, value));
118 } 115 }
119 116
120 @Override 117 @Override
121 - public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) { 118 + public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
122 - return checkOpen(() -> proxy.remove(tableName, key)); 119 + return checkOpen(() -> proxy.mapValues(mapName));
123 } 120 }
124 121
125 @Override 122 @Override
126 - public CompletableFuture<Result<Void>> clear(String tableName) { 123 + public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
127 - return checkOpen(() -> proxy.clear(tableName)); 124 + return checkOpen(() -> proxy.mapEntrySet(mapName));
128 - }
129 -
130 - @Override
131 - public CompletableFuture<Set<String>> keySet(String tableName) {
132 - return checkOpen(() -> proxy.keySet(tableName));
133 - }
134 -
135 - @Override
136 - public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
137 - return checkOpen(() -> proxy.values(tableName));
138 - }
139 -
140 - @Override
141 - public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
142 - return checkOpen(() -> proxy.entrySet(tableName));
143 - }
144 -
145 - @Override
146 - public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
147 - return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
148 - }
149 -
150 - @Override
151 - public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
152 - return checkOpen(() -> proxy.remove(tableName, key, value));
153 - }
154 -
155 - @Override
156 - public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
157 - return checkOpen(() -> proxy.remove(tableName, key, version));
158 - }
159 -
160 - @Override
161 - public CompletableFuture<Result<Boolean>> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
162 - return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
163 - }
164 -
165 - @Override
166 - public CompletableFuture<Result<Boolean>> replace(String tableName, String key, long oldVersion, byte[] newValue) {
167 - return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
168 - }
169 -
170 - @Override
171 - public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(String tableName,
172 - String key,
173 - long oldVersion,
174 - byte[] newValue) {
175 - return checkOpen(() -> proxy.replaceAndGet(tableName, key, oldVersion, newValue));
176 } 125 }
177 126
178 @Override 127 @Override
......
...@@ -48,7 +48,7 @@ import net.kuujo.copycat.state.StateContext; ...@@ -48,7 +48,7 @@ import net.kuujo.copycat.state.StateContext;
48 public class DefaultDatabaseState implements DatabaseState<String, byte[]> { 48 public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
49 private Long nextVersion; 49 private Long nextVersion;
50 private Map<String, AtomicLong> counters; 50 private Map<String, AtomicLong> counters;
51 - private Map<String, Map<String, Versioned<byte[]>>> tables; 51 + private Map<String, Map<String, Versioned<byte[]>>> maps;
52 private Map<String, Queue<byte[]>> queues; 52 private Map<String, Queue<byte[]>> queues;
53 private Map<String, Set<NodeId>> queueUpdateNotificationTargets; 53 private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
54 54
...@@ -72,10 +72,10 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -72,10 +72,10 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
72 counters = Maps.newConcurrentMap(); 72 counters = Maps.newConcurrentMap();
73 context.put("counters", counters); 73 context.put("counters", counters);
74 } 74 }
75 - tables = context.get("tables"); 75 + maps = context.get("maps");
76 - if (tables == null) { 76 + if (maps == null) {
77 - tables = Maps.newConcurrentMap(); 77 + maps = Maps.newConcurrentMap();
78 - context.put("tables", tables); 78 + context.put("maps", maps);
79 } 79 }
80 locks = context.get("locks"); 80 locks = context.get("locks");
81 if (locks == null) { 81 if (locks == null) {
...@@ -100,8 +100,8 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -100,8 +100,8 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
100 } 100 }
101 101
102 @Override 102 @Override
103 - public Set<String> tableNames() { 103 + public Set<String> maps() {
104 - return new HashSet<>(tables.keySet()); 104 + return ImmutableSet.copyOf(maps.keySet());
105 } 105 }
106 106
107 @Override 107 @Override
...@@ -112,96 +112,78 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -112,96 +112,78 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
112 } 112 }
113 113
114 @Override 114 @Override
115 - public int size(String tableName) { 115 + public int size(String mapName) {
116 - return getTableMap(tableName).size(); 116 + return getMap(mapName).size();
117 } 117 }
118 118
119 @Override 119 @Override
120 - public boolean isEmpty(String tableName) { 120 + public boolean mapIsEmpty(String mapName) {
121 - return getTableMap(tableName).isEmpty(); 121 + return getMap(mapName).isEmpty();
122 } 122 }
123 123
124 @Override 124 @Override
125 - public boolean containsKey(String tableName, String key) { 125 + public boolean mapContainsKey(String mapName, String key) {
126 - return getTableMap(tableName).containsKey(key); 126 + return getMap(mapName).containsKey(key);
127 } 127 }
128 128
129 @Override 129 @Override
130 - public boolean containsValue(String tableName, byte[] value) { 130 + public boolean mapContainsValue(String mapName, byte[] value) {
131 - return getTableMap(tableName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value)); 131 + return getMap(mapName).values().stream().anyMatch(v -> Arrays.equals(v.value(), value));
132 } 132 }
133 133
134 @Override 134 @Override
135 - public Versioned<byte[]> get(String tableName, String key) { 135 + public Versioned<byte[]> mapGet(String mapName, String key) {
136 - return getTableMap(tableName).get(key); 136 + return getMap(mapName).get(key);
137 } 137 }
138 138
139 - @Override
140 - public Result<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
141 - return isLockedForUpdates(tableName, key)
142 - ? Result.locked()
143 - : Result.ok(getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion)));
144 - }
145 139
146 @Override 140 @Override
147 - public Result<UpdateResult<Versioned<byte[]>>> putAndGet(String tableName, 141 + public Result<UpdateResult<String, byte[]>> mapUpdate(
142 + String mapName,
148 String key, 143 String key,
144 + Match<byte[]> valueMatch,
145 + Match<Long> versionMatch,
149 byte[] value) { 146 byte[] value) {
150 - if (isLockedForUpdates(tableName, key)) { 147 + if (isLockedForUpdates(mapName, key)) {
151 return Result.locked(); 148 return Result.locked();
152 - } else {
153 - Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
154 - Versioned<byte[]> oldValue = getTableMap(tableName).put(key, newValue);
155 - return Result.ok(new UpdateResult<>(true, oldValue, newValue));
156 } 149 }
157 - } 150 + Versioned<byte[]> currentValue = getMap(mapName).get(key);
158 - 151 + if (!valueMatch.matches(currentValue == null ? null : currentValue.value()) ||
159 - @Override 152 + !versionMatch.matches(currentValue == null ? null : currentValue.version())) {
160 - public Result<UpdateResult<Versioned<byte[]>>> putIfAbsentAndGet(String tableName, 153 + return Result.ok(new UpdateResult<>(false, mapName, key, currentValue, currentValue));
161 - String key,
162 - byte[] value) {
163 - if (isLockedForUpdates(tableName, key)) {
164 - return Result.locked();
165 - }
166 - Versioned<byte[]> currentValue = getTableMap(tableName).get(key);
167 - if (currentValue != null) {
168 - return Result.ok(new UpdateResult<>(false, currentValue, currentValue));
169 } else { 154 } else {
155 + if (value == null && currentValue != null) {
156 + getMap(mapName).remove(key);
157 + return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, null));
158 + }
170 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion); 159 Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
171 - getTableMap(tableName).put(key, newValue); 160 + getMap(mapName).put(key, newValue);
172 - return Result.ok(new UpdateResult<>(true, null, newValue)); 161 + return Result.ok(new UpdateResult<>(true, mapName, key, currentValue, newValue));
173 } 162 }
174 } 163 }
175 164
176 @Override 165 @Override
177 - public Result<Versioned<byte[]>> remove(String tableName, String key) { 166 + public Result<Void> mapClear(String mapName) {
178 - return isLockedForUpdates(tableName, key) 167 + if (areTransactionsInProgress(mapName)) {
179 - ? Result.locked()
180 - : Result.ok(getTableMap(tableName).remove(key));
181 - }
182 -
183 - @Override
184 - public Result<Void> clear(String tableName) {
185 - if (areTransactionsInProgress(tableName)) {
186 return Result.locked(); 168 return Result.locked();
187 } 169 }
188 - getTableMap(tableName).clear(); 170 + getMap(mapName).clear();
189 return Result.ok(null); 171 return Result.ok(null);
190 } 172 }
191 173
192 @Override 174 @Override
193 - public Set<String> keySet(String tableName) { 175 + public Set<String> mapKeySet(String mapName) {
194 - return ImmutableSet.copyOf(getTableMap(tableName).keySet()); 176 + return ImmutableSet.copyOf(getMap(mapName).keySet());
195 } 177 }
196 178
197 @Override 179 @Override
198 - public Collection<Versioned<byte[]>> values(String tableName) { 180 + public Collection<Versioned<byte[]>> mapValues(String mapName) {
199 - return ImmutableList.copyOf(getTableMap(tableName).values()); 181 + return ImmutableList.copyOf(getMap(mapName).values());
200 } 182 }
201 183
202 @Override 184 @Override
203 - public Set<Entry<String, Versioned<byte[]>>> entrySet(String tableName) { 185 + public Set<Entry<String, Versioned<byte[]>>> mapEntrySet(String mapName) {
204 - return ImmutableSet.copyOf(getTableMap(tableName) 186 + return ImmutableSet.copyOf(getMap(mapName)
205 .entrySet() 187 .entrySet()
206 .stream() 188 .stream()
207 .map(entry -> Pair.of(entry.getKey(), entry.getValue())) 189 .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
...@@ -209,85 +191,6 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -209,85 +191,6 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
209 } 191 }
210 192
211 @Override 193 @Override
212 - public Result<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
213 - if (isLockedForUpdates(tableName, key)) {
214 - return Result.locked();
215 - }
216 - Versioned<byte[]> existingValue = get(tableName, key);
217 - Versioned<byte[]> currentValue = existingValue != null ? existingValue : put(tableName, key, value).value();
218 - return Result.ok(currentValue);
219 - }
220 -
221 - @Override
222 - public Result<Boolean> remove(String tableName, String key, byte[] value) {
223 - if (isLockedForUpdates(tableName, key)) {
224 - return Result.locked();
225 - }
226 - Versioned<byte[]> existing = get(tableName, key);
227 - if (existing != null && Arrays.equals(existing.value(), value)) {
228 - getTableMap(tableName).remove(key);
229 - return Result.ok(true);
230 - }
231 - return Result.ok(false);
232 - }
233 -
234 - @Override
235 - public Result<Boolean> remove(String tableName, String key, long version) {
236 - if (isLockedForUpdates(tableName, key)) {
237 - return Result.locked();
238 - }
239 - Versioned<byte[]> existing = get(tableName, key);
240 - if (existing != null && existing.version() == version) {
241 - remove(tableName, key);
242 - return Result.ok(true);
243 - }
244 - return Result.ok(false);
245 - }
246 -
247 - @Override
248 - public Result<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
249 - if (isLockedForUpdates(tableName, key)) {
250 - return Result.locked();
251 - }
252 - Versioned<byte[]> existing = get(tableName, key);
253 - if (existing != null && Arrays.equals(existing.value(), oldValue)) {
254 - put(tableName, key, newValue);
255 - return Result.ok(true);
256 - }
257 - return Result.ok(false);
258 - }
259 -
260 - @Override
261 - public Result<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
262 - if (isLockedForUpdates(tableName, key)) {
263 - return Result.locked();
264 - }
265 - Versioned<byte[]> existing = get(tableName, key);
266 - if (existing != null && existing.version() == oldVersion) {
267 - put(tableName, key, newValue);
268 - return Result.ok(true);
269 - }
270 - return Result.ok(false);
271 - }
272 -
273 - @Override
274 - public Result<UpdateResult<Versioned<byte[]>>> replaceAndGet(
275 - String tableName, String key, long oldVersion, byte[] newValue) {
276 - if (isLockedForUpdates(tableName, key)) {
277 - return Result.locked();
278 - }
279 - boolean updated = false;
280 - Versioned<byte[]> previous = get(tableName, key);
281 - Versioned<byte[]> current = previous;
282 - if (previous != null && previous.version() == oldVersion) {
283 - current = new Versioned<>(newValue, ++nextVersion);
284 - getTableMap(tableName).put(key, current);
285 - updated = true;
286 - }
287 - return Result.ok(new UpdateResult<>(updated, previous, current));
288 - }
289 -
290 - @Override
291 public Long counterAddAndGet(String counterName, long delta) { 194 public Long counterAddAndGet(String counterName, long delta) {
292 return getCounter(counterName).addAndGet(delta); 195 return getCounter(counterName).addAndGet(delta);
293 } 196 }
...@@ -343,7 +246,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -343,7 +246,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
343 @Override 246 @Override
344 public boolean prepare(Transaction transaction) { 247 public boolean prepare(Transaction transaction) {
345 if (transaction.updates().stream().anyMatch(update -> 248 if (transaction.updates().stream().anyMatch(update ->
346 - isLockedByAnotherTransaction(update.tableName(), 249 + isLockedByAnotherTransaction(update.mapName(),
347 update.key(), 250 update.key(),
348 transaction.id()))) { 251 transaction.id()))) {
349 return false; 252 return false;
...@@ -368,12 +271,12 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -368,12 +271,12 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
368 return true; 271 return true;
369 } 272 }
370 273
371 - private Map<String, Versioned<byte[]>> getTableMap(String tableName) { 274 + private Map<String, Versioned<byte[]>> getMap(String mapName) {
372 - return tables.computeIfAbsent(tableName, name -> Maps.newConcurrentMap()); 275 + return maps.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
373 } 276 }
374 277
375 - private Map<String, Update> getLockMap(String tableName) { 278 + private Map<String, Update> getLockMap(String mapName) {
376 - return locks.computeIfAbsent(tableName, name -> Maps.newConcurrentMap()); 279 + return locks.computeIfAbsent(mapName, name -> Maps.newConcurrentMap());
377 } 280 }
378 281
379 private AtomicLong getCounter(String counterName) { 282 private AtomicLong getCounter(String counterName) {
...@@ -389,7 +292,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -389,7 +292,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
389 } 292 }
390 293
391 private boolean isUpdatePossible(DatabaseUpdate update) { 294 private boolean isUpdatePossible(DatabaseUpdate update) {
392 - Versioned<byte[]> existingEntry = get(update.tableName(), update.key()); 295 + Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
393 switch (update.type()) { 296 switch (update.type()) {
394 case PUT: 297 case PUT:
395 case REMOVE: 298 case REMOVE:
...@@ -410,7 +313,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -410,7 +313,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
410 } 313 }
411 314
412 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) { 315 private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
413 - Map<String, Update> lockMap = getLockMap(update.tableName()); 316 + Map<String, Update> lockMap = getLockMap(update.mapName());
414 switch (update.type()) { 317 switch (update.type()) {
415 case PUT: 318 case PUT:
416 case PUT_IF_ABSENT: 319 case PUT_IF_ABSENT:
...@@ -429,12 +332,12 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -429,12 +332,12 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
429 } 332 }
430 333
431 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) { 334 private void commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
432 - String tableName = update.tableName(); 335 + String mapName = update.mapName();
433 String key = update.key(); 336 String key = update.key();
434 Type type = update.type(); 337 Type type = update.type();
435 - Update provisionalUpdate = getLockMap(tableName).get(key); 338 + Update provisionalUpdate = getLockMap(mapName).get(key);
436 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { 339 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
437 - getLockMap(tableName).remove(key); 340 + getLockMap(mapName).remove(key);
438 } else { 341 } else {
439 return; 342 return;
440 } 343 }
...@@ -444,12 +347,12 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -444,12 +347,12 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
444 case PUT_IF_ABSENT: 347 case PUT_IF_ABSENT:
445 case PUT_IF_VERSION_MATCH: 348 case PUT_IF_VERSION_MATCH:
446 case PUT_IF_VALUE_MATCH: 349 case PUT_IF_VALUE_MATCH:
447 - put(tableName, key, provisionalUpdate.value()); 350 + mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value());
448 break; 351 break;
449 case REMOVE: 352 case REMOVE:
450 case REMOVE_IF_VERSION_MATCH: 353 case REMOVE_IF_VERSION_MATCH:
451 case REMOVE_IF_VALUE_MATCH: 354 case REMOVE_IF_VALUE_MATCH:
452 - remove(tableName, key); 355 + mapUpdate(mapName, key, Match.any(), Match.any(), null);
453 break; 356 break;
454 default: 357 default:
455 break; 358 break;
...@@ -457,28 +360,28 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -457,28 +360,28 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
457 } 360 }
458 361
459 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) { 362 private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
460 - String tableName = update.tableName(); 363 + String mapName = update.mapName();
461 String key = update.key(); 364 String key = update.key();
462 - Update provisionalUpdate = getLockMap(tableName).get(key); 365 + Update provisionalUpdate = getLockMap(mapName).get(key);
463 if (provisionalUpdate == null) { 366 if (provisionalUpdate == null) {
464 return; 367 return;
465 } 368 }
466 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) { 369 if (Objects.equal(transactionId, provisionalUpdate.transactionId())) {
467 - getLockMap(tableName).remove(key); 370 + getLockMap(mapName).remove(key);
468 } 371 }
469 } 372 }
470 373
471 - private boolean isLockedByAnotherTransaction(String tableName, String key, long transactionId) { 374 + private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
472 - Update update = getLockMap(tableName).get(key); 375 + Update update = getLockMap(mapName).get(key);
473 return update != null && !Objects.equal(transactionId, update.transactionId()); 376 return update != null && !Objects.equal(transactionId, update.transactionId());
474 } 377 }
475 378
476 - private boolean isLockedForUpdates(String tableName, String key) { 379 + private boolean isLockedForUpdates(String mapName, String key) {
477 - return getLockMap(tableName).containsKey(key); 380 + return getLockMap(mapName).containsKey(key);
478 } 381 }
479 382
480 - private boolean areTransactionsInProgress(String tableName) { 383 + private boolean areTransactionsInProgress(String mapName) {
481 - return !getLockMap(tableName).isEmpty(); 384 + return !getLockMap(mapName).isEmpty();
482 } 385 }
483 386
484 private class Update { 387 private class Update {
......
...@@ -164,7 +164,7 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> { ...@@ -164,7 +164,7 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
164 Versioned<V> original = readCache.get(key); 164 Versioned<V> original = readCache.get(key);
165 if (original != null) { 165 if (original != null) {
166 updates.add(DatabaseUpdate.newBuilder() 166 updates.add(DatabaseUpdate.newBuilder()
167 - .withTableName(name) 167 + .withMapName(name)
168 .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH) 168 .withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
169 .withKey(keyCache.getUnchecked(key)) 169 .withKey(keyCache.getUnchecked(key))
170 .withCurrentVersion(original.version()) 170 .withCurrentVersion(original.version())
...@@ -175,14 +175,14 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> { ...@@ -175,14 +175,14 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
175 Versioned<V> original = readCache.get(key); 175 Versioned<V> original = readCache.get(key);
176 if (original == null) { 176 if (original == null) {
177 updates.add(DatabaseUpdate.newBuilder() 177 updates.add(DatabaseUpdate.newBuilder()
178 - .withTableName(name) 178 + .withMapName(name)
179 .withType(DatabaseUpdate.Type.PUT_IF_ABSENT) 179 .withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
180 .withKey(keyCache.getUnchecked(key)) 180 .withKey(keyCache.getUnchecked(key))
181 .withValue(serializer.encode(value)) 181 .withValue(serializer.encode(value))
182 .build()); 182 .build());
183 } else { 183 } else {
184 updates.add(DatabaseUpdate.newBuilder() 184 updates.add(DatabaseUpdate.newBuilder()
185 - .withTableName(name) 185 + .withMapName(name)
186 .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH) 186 .withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
187 .withKey(keyCache.getUnchecked(key)) 187 .withKey(keyCache.getUnchecked(key))
188 .withCurrentVersion(original.version()) 188 .withCurrentVersion(original.version())
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.consistent.impl;
17 +
18 +import static com.google.common.base.MoreObjects.toStringHelper;
19 +
20 +import java.util.Arrays;
21 +import java.util.Objects;
22 +import java.util.function.Function;
23 +
24 +/**
25 + * Utility class for checking matching values.
26 + *
27 + * @param <T> type of value
28 + */
29 +public final class Match<T> {
30 +
31 + private final boolean matchAny;
32 + private final T value;
33 +
34 + /**
35 + * Returns a Match that matches any value.
36 + * @param <T> match type
37 + * @return new instance
38 + */
39 + public static <T> Match<T> any() {
40 + return new Match<>();
41 + }
42 +
43 + /**
44 + * Returns a Match that matches null values.
45 + * @param <T> match type
46 + * @return new instance
47 + */
48 + public static <T> Match<T> ifNull() {
49 + return ifValue(null);
50 + }
51 +
52 + /**
53 + * Returns a Match that matches only specified value.
54 + * @param value value to match
55 + * @param <T> match type
56 + * @return new instance
57 + */
58 + public static <T> Match<T> ifValue(T value) {
59 + return new Match<>(value);
60 + }
61 +
62 + private Match() {
63 + matchAny = true;
64 + value = null;
65 + }
66 +
67 + private Match(T value) {
68 + matchAny = false;
69 + this.value = value;
70 + }
71 +
72 + /**
73 + * Maps this instance to a Match of another type.
74 + * @param mapper transformation function
75 + * @param <V> new match type
76 + * @return new instance
77 + */
78 + public <V> Match<V> map(Function<T, V> mapper) {
79 + if (matchAny) {
80 + return any();
81 + } else if (value == null) {
82 + return ifNull();
83 + } else {
84 + return ifValue(mapper.apply(value));
85 + }
86 + }
87 +
88 + /**
89 + * Checks if this instance matches specified value.
90 + * @param other other value
91 + * @return true if matches; false otherwise
92 + */
93 + public boolean matches(T other) {
94 + if (matchAny) {
95 + return true;
96 + } else if (other == null) {
97 + return value == null;
98 + } else {
99 + if (value instanceof byte[]) {
100 + return Arrays.equals((byte[]) value, (byte[]) other);
101 + }
102 + return Objects.equals(value, other);
103 + }
104 + }
105 +
106 + @Override
107 + public String toString() {
108 + return toStringHelper(this)
109 + .add("matchAny", matchAny)
110 + .add("value", value)
111 + .toString();
112 + }
113 +}
...@@ -82,14 +82,14 @@ public class PartitionedDatabase implements Database { ...@@ -82,14 +82,14 @@ public class PartitionedDatabase implements Database {
82 } 82 }
83 83
84 @Override 84 @Override
85 - public CompletableFuture<Set<String>> tableNames() { 85 + public CompletableFuture<Set<String>> maps() {
86 checkState(isOpen.get(), DB_NOT_OPEN); 86 checkState(isOpen.get(), DB_NOT_OPEN);
87 - Set<String> tableNames = Sets.newConcurrentHashSet(); 87 + Set<String> mapNames = Sets.newConcurrentHashSet();
88 return CompletableFuture.allOf(partitions 88 return CompletableFuture.allOf(partitions
89 .stream() 89 .stream()
90 - .map(db -> db.tableNames().thenApply(tableNames::addAll)) 90 + .map(db -> db.maps().thenApply(mapNames::addAll))
91 .toArray(CompletableFuture[]::new)) 91 .toArray(CompletableFuture[]::new))
92 - .thenApply(v -> tableNames); 92 + .thenApply(v -> mapNames);
93 } 93 }
94 94
95 @Override 95 @Override
...@@ -108,158 +108,100 @@ public class PartitionedDatabase implements Database { ...@@ -108,158 +108,100 @@ public class PartitionedDatabase implements Database {
108 } 108 }
109 109
110 @Override 110 @Override
111 - public CompletableFuture<Integer> size(String tableName) { 111 + public CompletableFuture<Integer> mapSize(String mapName) {
112 checkState(isOpen.get(), DB_NOT_OPEN); 112 checkState(isOpen.get(), DB_NOT_OPEN);
113 AtomicInteger totalSize = new AtomicInteger(0); 113 AtomicInteger totalSize = new AtomicInteger(0);
114 return CompletableFuture.allOf(partitions 114 return CompletableFuture.allOf(partitions
115 .stream() 115 .stream()
116 - .map(p -> p.size(tableName).thenApply(totalSize::addAndGet)) 116 + .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
117 .toArray(CompletableFuture[]::new)) 117 .toArray(CompletableFuture[]::new))
118 .thenApply(v -> totalSize.get()); 118 .thenApply(v -> totalSize.get());
119 } 119 }
120 120
121 @Override 121 @Override
122 - public CompletableFuture<Boolean> isEmpty(String tableName) { 122 + public CompletableFuture<Boolean> mapIsEmpty(String mapName) {
123 checkState(isOpen.get(), DB_NOT_OPEN); 123 checkState(isOpen.get(), DB_NOT_OPEN);
124 - return size(tableName).thenApply(size -> size == 0); 124 + return mapSize(mapName).thenApply(size -> size == 0);
125 } 125 }
126 126
127 @Override 127 @Override
128 - public CompletableFuture<Boolean> containsKey(String tableName, String key) { 128 + public CompletableFuture<Boolean> mapContainsKey(String mapName, String key) {
129 checkState(isOpen.get(), DB_NOT_OPEN); 129 checkState(isOpen.get(), DB_NOT_OPEN);
130 - return partitioner.getPartition(tableName, key).containsKey(tableName, key); 130 + return partitioner.getPartition(mapName, key).mapContainsKey(mapName, key);
131 } 131 }
132 132
133 @Override 133 @Override
134 - public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) { 134 + public CompletableFuture<Boolean> mapContainsValue(String mapName, byte[] value) {
135 checkState(isOpen.get(), DB_NOT_OPEN); 135 checkState(isOpen.get(), DB_NOT_OPEN);
136 AtomicBoolean containsValue = new AtomicBoolean(false); 136 AtomicBoolean containsValue = new AtomicBoolean(false);
137 return CompletableFuture.allOf(partitions 137 return CompletableFuture.allOf(partitions
138 .stream() 138 .stream()
139 - .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v))) 139 + .map(p -> p.mapContainsValue(mapName, value)
140 + .thenApply(v -> containsValue.compareAndSet(false, v)))
140 .toArray(CompletableFuture[]::new)) 141 .toArray(CompletableFuture[]::new))
141 .thenApply(v -> containsValue.get()); 142 .thenApply(v -> containsValue.get());
142 } 143 }
143 144
144 @Override 145 @Override
145 - public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) { 146 + public CompletableFuture<Versioned<byte[]>> mapGet(String mapName, String key) {
146 checkState(isOpen.get(), DB_NOT_OPEN); 147 checkState(isOpen.get(), DB_NOT_OPEN);
147 - return partitioner.getPartition(tableName, key).get(tableName, key); 148 + return partitioner.getPartition(mapName, key).mapGet(mapName, key);
148 } 149 }
149 150
150 @Override 151 @Override
151 - public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) { 152 + public CompletableFuture<Result<UpdateResult<String, byte[]>>> mapUpdate(
152 - checkState(isOpen.get(), DB_NOT_OPEN); 153 + String mapName, String key, Match<byte[]> valueMatch,
153 - return partitioner.getPartition(tableName, key).put(tableName, key, value); 154 + Match<Long> versionMatch, byte[] value) {
154 - } 155 + return partitioner.getPartition(mapName, key).mapUpdate(mapName, key, valueMatch, versionMatch, value);
155 -
156 - @Override
157 - public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
158 - String key,
159 - byte[] value) {
160 - checkState(isOpen.get(), DB_NOT_OPEN);
161 - return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
162 - }
163 -
164 - @Override
165 - public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
166 - String key,
167 - byte[] value) {
168 - checkState(isOpen.get(), DB_NOT_OPEN);
169 - return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
170 - }
171 156
172 - @Override
173 - public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
174 - checkState(isOpen.get(), DB_NOT_OPEN);
175 - return partitioner.getPartition(tableName, key).remove(tableName, key);
176 } 157 }
177 158
178 @Override 159 @Override
179 - public CompletableFuture<Result<Void>> clear(String tableName) { 160 + public CompletableFuture<Result<Void>> mapClear(String mapName) {
180 AtomicBoolean isLocked = new AtomicBoolean(false); 161 AtomicBoolean isLocked = new AtomicBoolean(false);
181 checkState(isOpen.get(), DB_NOT_OPEN); 162 checkState(isOpen.get(), DB_NOT_OPEN);
182 return CompletableFuture.allOf(partitions 163 return CompletableFuture.allOf(partitions
183 .stream() 164 .stream()
184 - .map(p -> p.clear(tableName) 165 + .map(p -> p.mapClear(mapName)
185 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status()))) 166 .thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
186 .toArray(CompletableFuture[]::new)) 167 .toArray(CompletableFuture[]::new))
187 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null)); 168 .thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
188 } 169 }
189 170
190 @Override 171 @Override
191 - public CompletableFuture<Set<String>> keySet(String tableName) { 172 + public CompletableFuture<Set<String>> mapKeySet(String mapName) {
192 checkState(isOpen.get(), DB_NOT_OPEN); 173 checkState(isOpen.get(), DB_NOT_OPEN);
193 Set<String> keySet = Sets.newConcurrentHashSet(); 174 Set<String> keySet = Sets.newConcurrentHashSet();
194 return CompletableFuture.allOf(partitions 175 return CompletableFuture.allOf(partitions
195 .stream() 176 .stream()
196 - .map(p -> p.keySet(tableName).thenApply(keySet::addAll)) 177 + .map(p -> p.mapKeySet(mapName).thenApply(keySet::addAll))
197 .toArray(CompletableFuture[]::new)) 178 .toArray(CompletableFuture[]::new))
198 .thenApply(v -> keySet); 179 .thenApply(v -> keySet);
199 } 180 }
200 181
201 @Override 182 @Override
202 - public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) { 183 + public CompletableFuture<Collection<Versioned<byte[]>>> mapValues(String mapName) {
203 checkState(isOpen.get(), DB_NOT_OPEN); 184 checkState(isOpen.get(), DB_NOT_OPEN);
204 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>(); 185 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
205 return CompletableFuture.allOf(partitions 186 return CompletableFuture.allOf(partitions
206 .stream() 187 .stream()
207 - .map(p -> p.values(tableName).thenApply(values::addAll)) 188 + .map(p -> p.mapValues(mapName).thenApply(values::addAll))
208 .toArray(CompletableFuture[]::new)) 189 .toArray(CompletableFuture[]::new))
209 .thenApply(v -> values); 190 .thenApply(v -> values);
210 } 191 }
211 192
212 @Override 193 @Override
213 - public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) { 194 + public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> mapEntrySet(String mapName) {
214 checkState(isOpen.get(), DB_NOT_OPEN); 195 checkState(isOpen.get(), DB_NOT_OPEN);
215 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); 196 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
216 return CompletableFuture.allOf(partitions 197 return CompletableFuture.allOf(partitions
217 .stream() 198 .stream()
218 - .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll)) 199 + .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
219 .toArray(CompletableFuture[]::new)) 200 .toArray(CompletableFuture[]::new))
220 .thenApply(v -> entrySet); 201 .thenApply(v -> entrySet);
221 } 202 }
222 203
223 @Override 204 @Override
224 - public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
225 - checkState(isOpen.get(), DB_NOT_OPEN);
226 - return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
227 - }
228 -
229 - @Override
230 - public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
231 - checkState(isOpen.get(), DB_NOT_OPEN);
232 - return partitioner.getPartition(tableName, key).remove(tableName, key, value);
233 - }
234 -
235 - @Override
236 - public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
237 - checkState(isOpen.get(), DB_NOT_OPEN);
238 - return partitioner.getPartition(tableName, key).remove(tableName, key, version);
239 - }
240 -
241 - @Override
242 - public CompletableFuture<Result<Boolean>> replace(
243 - String tableName, String key, byte[] oldValue, byte[] newValue) {
244 - checkState(isOpen.get(), DB_NOT_OPEN);
245 - return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
246 - }
247 -
248 - @Override
249 - public CompletableFuture<Result<Boolean>> replace(
250 - String tableName, String key, long oldVersion, byte[] newValue) {
251 - checkState(isOpen.get(), DB_NOT_OPEN);
252 - return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
253 - }
254 -
255 - @Override
256 - public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
257 - String tableName, String key, long oldVersion, byte[] newValue) {
258 - checkState(isOpen.get(), DB_NOT_OPEN);
259 - return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
260 - }
261 -
262 - @Override
263 public CompletableFuture<Long> counterGet(String counterName) { 205 public CompletableFuture<Long> counterGet(String counterName) {
264 checkState(isOpen.get(), DB_NOT_OPEN); 206 checkState(isOpen.get(), DB_NOT_OPEN);
265 return partitioner.getPartition(counterName, counterName).counterGet(counterName); 207 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
...@@ -408,7 +350,7 @@ public class PartitionedDatabase implements Database { ...@@ -408,7 +350,7 @@ public class PartitionedDatabase implements Database {
408 Transaction transaction) { 350 Transaction transaction) {
409 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap(); 351 Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
410 for (DatabaseUpdate update : transaction.updates()) { 352 for (DatabaseUpdate update : transaction.updates()) {
411 - Database partition = partitioner.getPartition(update.tableName(), update.key()); 353 + Database partition = partitioner.getPartition(update.mapName(), update.key());
412 List<DatabaseUpdate> partitionUpdates = 354 List<DatabaseUpdate> partitionUpdates =
413 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList()); 355 perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
414 partitionUpdates.add(update); 356 partitionUpdates.add(update);
......
...@@ -25,9 +25,9 @@ public interface Partitioner<K> { ...@@ -25,9 +25,9 @@ public interface Partitioner<K> {
25 25
26 /** 26 /**
27 * Returns the database partition. 27 * Returns the database partition.
28 - * @param tableName table name 28 + * @param mapName map name
29 * @param key key 29 * @param key key
30 * @return Database partition 30 * @return Database partition
31 */ 31 */
32 - Database getPartition(String tableName, K key); 32 + Database getPartition(String mapName, K key);
33 } 33 }
......
...@@ -32,7 +32,7 @@ public class SimpleKeyHashPartitioner extends DatabasePartitioner { ...@@ -32,7 +32,7 @@ public class SimpleKeyHashPartitioner extends DatabasePartitioner {
32 } 32 }
33 33
34 @Override 34 @Override
35 - public Database getPartition(String tableName, String key) { 35 + public Database getPartition(String mapName, String key) {
36 return partitions.get(hash(key) % partitions.size()); 36 return partitions.get(hash(key) % partitions.size());
37 } 37 }
38 } 38 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -19,11 +19,11 @@ package org.onosproject.store.consistent.impl; ...@@ -19,11 +19,11 @@ package org.onosproject.store.consistent.impl;
19 import java.util.List; 19 import java.util.List;
20 20
21 /** 21 /**
22 - * A simple Partitioner that uses the table name hash to 22 + * A simple Partitioner that uses the map name hash to
23 * pick a partition. 23 * pick a partition.
24 * <p> 24 * <p>
25 - * This class uses a md5 hash based hashing scheme for hashing the table name to 25 + * This class uses a md5 hash based hashing scheme for hashing the map name to
26 - * a partition. This partitioner maps all keys for a table to the same database 26 + * a partition. This partitioner maps all keys for a map to the same database
27 * partition. 27 * partition.
28 */ 28 */
29 public class SimpleTableHashPartitioner extends DatabasePartitioner { 29 public class SimpleTableHashPartitioner extends DatabasePartitioner {
...@@ -33,7 +33,7 @@ public class SimpleTableHashPartitioner extends DatabasePartitioner { ...@@ -33,7 +33,7 @@ public class SimpleTableHashPartitioner extends DatabasePartitioner {
33 } 33 }
34 34
35 @Override 35 @Override
36 - public Database getPartition(String tableName, String key) { 36 + public Database getPartition(String mapName, String key) {
37 - return partitions.get(hash(tableName) % partitions.size()); 37 + return partitions.get(hash(mapName) % partitions.size());
38 } 38 }
39 } 39 }
......
...@@ -15,6 +15,11 @@ ...@@ -15,6 +15,11 @@
15 */ 15 */
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 +import java.util.function.Function;
19 +
20 +import org.onosproject.store.service.MapEvent;
21 +import org.onosproject.store.service.Versioned;
22 +
18 /** 23 /**
19 * Result of a update operation. 24 * Result of a update operation.
20 * <p> 25 * <p>
...@@ -23,14 +28,18 @@ package org.onosproject.store.consistent.impl; ...@@ -23,14 +28,18 @@ package org.onosproject.store.consistent.impl;
23 * point to the same unmodified value. 28 * point to the same unmodified value.
24 * @param <V> result type 29 * @param <V> result type
25 */ 30 */
26 -public class UpdateResult<V> { 31 +public class UpdateResult<K, V> {
27 32
28 private final boolean updated; 33 private final boolean updated;
29 - private final V oldValue; 34 + private final String mapName;
30 - private final V newValue; 35 + private final K key;
36 + private final Versioned<V> oldValue;
37 + private final Versioned<V> newValue;
31 38
32 - public UpdateResult(boolean updated, V oldValue, V newValue) { 39 + public UpdateResult(boolean updated, String mapName, K key, Versioned<V> oldValue, Versioned<V> newValue) {
33 this.updated = updated; 40 this.updated = updated;
41 + this.mapName = mapName;
42 + this.key = key;
34 this.oldValue = oldValue; 43 this.oldValue = oldValue;
35 this.newValue = newValue; 44 this.newValue = newValue;
36 } 45 }
...@@ -39,11 +48,38 @@ public class UpdateResult<V> { ...@@ -39,11 +48,38 @@ public class UpdateResult<V> {
39 return updated; 48 return updated;
40 } 49 }
41 50
42 - public V oldValue() { 51 + public String mapName() {
52 + return mapName;
53 + }
54 +
55 + public K key() {
56 + return key;
57 + }
58 +
59 + public Versioned<V> oldValue() {
43 return oldValue; 60 return oldValue;
44 } 61 }
45 62
46 - public V newValue() { 63 + public Versioned<V> newValue() {
47 return newValue; 64 return newValue;
48 } 65 }
66 +
67 + public <K1, V1> UpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
68 + return new UpdateResult<>(updated,
69 + mapName,
70 + keyTransform.apply(key),
71 + oldValue == null ? null : oldValue.map(valueMapper),
72 + newValue == null ? null : newValue.map(valueMapper));
73 + }
74 +
75 + public MapEvent<K, V> toMapEvent() {
76 + if (!updated) {
77 + return null;
78 + } else {
79 + MapEvent.Type eventType = oldValue == null ?
80 + MapEvent.Type.INSERT : newValue == null ? MapEvent.Type.REMOVE : MapEvent.Type.UPDATE;
81 + Versioned<V> eventValue = eventType == MapEvent.Type.REMOVE ? oldValue : newValue;
82 + return new MapEvent<>(mapName(), eventType, key(), eventValue);
83 + }
84 + }
49 } 85 }
...\ No newline at end of file ...\ No newline at end of file
......