Madan Jampani

Reworked DatabaseService API.

Initial implementation of LockManager.
Showing 22 changed files with 1129 additions and 260 deletions
...@@ -15,8 +15,13 @@ ...@@ -15,8 +15,13 @@
15 */ 15 */
16 package org.onlab.onos.foo; 16 package org.onlab.onos.foo;
17 17
18 +import static java.util.concurrent.Executors.newScheduledThreadPool;
19 +import static org.onlab.util.Tools.namedThreads;
20 +import static org.slf4j.LoggerFactory.getLogger;
21 +
18 import java.nio.ByteBuffer; 22 import java.nio.ByteBuffer;
19 import java.util.concurrent.ScheduledExecutorService; 23 import java.util.concurrent.ScheduledExecutorService;
24 +import java.util.concurrent.TimeUnit;
20 25
21 import org.apache.felix.scr.annotations.Activate; 26 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 27 import org.apache.felix.scr.annotations.Component;
...@@ -37,20 +42,10 @@ import org.onlab.onos.net.intent.IntentEvent; ...@@ -37,20 +42,10 @@ import org.onlab.onos.net.intent.IntentEvent;
37 import org.onlab.onos.net.intent.IntentListener; 42 import org.onlab.onos.net.intent.IntentListener;
38 import org.onlab.onos.net.intent.IntentService; 43 import org.onlab.onos.net.intent.IntentService;
39 import org.onlab.onos.store.service.DatabaseAdminService; 44 import org.onlab.onos.store.service.DatabaseAdminService;
40 -import org.onlab.onos.store.service.DatabaseException;
41 import org.onlab.onos.store.service.DatabaseService; 45 import org.onlab.onos.store.service.DatabaseService;
42 -import org.onlab.onos.store.service.OptionalResult; 46 +import org.onlab.onos.store.service.VersionedValue;
43 -import org.onlab.onos.store.service.PreconditionFailedException;
44 -import org.onlab.onos.store.service.ReadRequest;
45 -import org.onlab.onos.store.service.ReadResult;
46 -import org.onlab.onos.store.service.WriteRequest;
47 -import org.onlab.onos.store.service.WriteResult;
48 import org.slf4j.Logger; 47 import org.slf4j.Logger;
49 48
50 -import static org.onlab.util.Tools.namedThreads;
51 -import static org.slf4j.LoggerFactory.getLogger;
52 -import static java.util.concurrent.Executors.newScheduledThreadPool;
53 -
54 /** 49 /**
55 * Playground app component. 50 * Playground app component.
56 */ 51 */
...@@ -97,9 +92,9 @@ public class FooComponent { ...@@ -97,9 +92,9 @@ public class FooComponent {
97 log.info("Couldn't find DB service"); 92 log.info("Couldn't find DB service");
98 } else { 93 } else {
99 log.info("Found DB service"); 94 log.info("Found DB service");
100 -// longIncrementor(); 95 + longIncrementor();
101 -// executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS); 96 + executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
102 -// executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS); 97 + executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
103 } 98 }
104 log.info("Started"); 99 log.info("Started");
105 } 100 }
...@@ -164,44 +159,33 @@ public class FooComponent { ...@@ -164,44 +159,33 @@ public class FooComponent {
164 159
165 dbAdminService.createTable(someTable); 160 dbAdminService.createTable(someTable);
166 161
167 - ReadResult read = dbService.read(ReadRequest.get(someTable, someKey)); 162 + VersionedValue vv = dbService.get(someTable, someKey);
168 - if (!read.valueExists()) { 163 + if (vv == null) {
169 ByteBuffer zero = ByteBuffer.allocate(Long.BYTES).putLong(0); 164 ByteBuffer zero = ByteBuffer.allocate(Long.BYTES).putLong(0);
170 - try { 165 + if (dbService.putIfAbsent(someTable, someKey, zero.array())) {
171 - dbService.write(WriteRequest 166 + log.info("Wrote initial value");
172 - .putIfAbsent(someTable, 167 + vv = dbService.get(someTable, someKey);
173 - someKey, 168 + } else {
174 - zero.array())); 169 + log.info("Concurrent write detected.");
175 - log.info("Wrote initial value"); 170 + // concurrent write detected, read and fall through
176 - read = dbService.read(ReadRequest.get(someTable, someKey)); 171 + vv = dbService.get(someTable, someKey);
177 - } catch (PreconditionFailedException e) { 172 + if (vv == null) {
178 - log.info("Concurrent write detected.", e); 173 + log.error("Shouldn't reach here");
179 - 174 + }
180 - // concurrent write detected, read and fall through
181 - read = dbService.read(ReadRequest.get(someTable, someKey));
182 - if (!read.valueExists()) {
183 - log.error("Shouldn't reach here");
184 - }
185 } 175 }
186 } 176 }
187 int retry = 5; 177 int retry = 5;
188 do { 178 do {
189 - ByteBuffer prev = ByteBuffer.wrap(read.value().value()); 179 + ByteBuffer prev = ByteBuffer.wrap(vv.value());
190 long next = prev.getLong() + 1; 180 long next = prev.getLong() + 1;
191 byte[] newValue = ByteBuffer.allocate(Long.BYTES).putLong(next).array(); 181 byte[] newValue = ByteBuffer.allocate(Long.BYTES).putLong(next).array();
192 - OptionalResult<WriteResult, DatabaseException> result 182 + if (dbService.putIfVersionMatches(someTable, someKey, newValue, vv.version())) {
193 - = dbService.writeNothrow(WriteRequest 183 + log.info("Write success. New value: {}", next);
194 - .putIfVersionMatches(someTable,
195 - someKey,
196 - newValue,
197 - read.value().version()));
198 - if (result.hasValidResult()) {
199 - log.info("Write success {} -> {}", result.get().previousValue(), next);
200 break; 184 break;
201 } else { 185 } else {
202 log.info("Write failed trying to write {}", next); 186 log.info("Write failed trying to write {}", next);
203 - read = dbService.read(ReadRequest.get(someTable, someKey)); 187 + vv = dbService.get(someTable, someKey);
204 - if (!read.valueExists()) { 188 + if (vv == null) {
205 log.error("Shouldn't reach here"); 189 log.error("Shouldn't reach here");
206 } 190 }
207 } 191 }
......
1 +package org.onlab.onos.store.service;
2 +
3 +import java.util.Collections;
4 +import java.util.List;
5 +
6 +import com.google.common.collect.Lists;
7 +
8 +/**
9 + * Collection of read requests to be submitted as one batch.
10 + */
11 +public class BatchReadRequest {
12 +
13 + private final List<ReadRequest> readRequests;
14 +
15 + /**
16 + * Creates a new BatchReadRequest object from the specified list of read requests.
17 + * @param readRequests read requests.
18 + * @return BatchReadRequest object.
19 + */
20 + public static BatchReadRequest create(List<ReadRequest> readRequests) {
21 + return new BatchReadRequest(readRequests);
22 + }
23 +
24 + private BatchReadRequest(List<ReadRequest> readRequests) {
25 + this.readRequests = Collections.unmodifiableList(readRequests);
26 + }
27 +
28 + /**
29 + * Returns the number of requests in this batch.
30 + * @return size of request batch.
31 + */
32 + public int batchSize() {
33 + return readRequests.size();
34 + }
35 +
36 + /**
37 + * Returns the requests in this batch as a list.
38 + * @return list of read requests
39 + */
40 + public List<ReadRequest> getAsList() {
41 + return readRequests;
42 + }
43 +
44 + /**
45 + * Builder for BatchReadRequest.
46 + */
47 + public static class Builder {
48 +
49 + private final List<ReadRequest> readRequests = Lists.newLinkedList();
50 +
51 + /**
52 + * Append a get request.
53 + * @param tableName table name
54 + * @param key key to fetch.
55 + * @return this Builder
56 + */
57 + public Builder get(String tableName, String key) {
58 + readRequests.add(new ReadRequest(tableName, key));
59 + return this;
60 + }
61 +
62 + /**
63 + * Builds a BatchReadRequest
64 + * @return BatchReadRequest
65 + */
66 + public BatchReadRequest build() {
67 + return new BatchReadRequest(readRequests);
68 + }
69 + }
70 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.service;
2 +
3 +import java.util.Collections;
4 +import java.util.List;
5 +
6 +public class BatchReadResult {
7 +
8 + private final List<ReadResult> readResults;
9 +
10 + public BatchReadResult(List<ReadResult> readResults) {
11 + this.readResults = Collections.unmodifiableList(readResults);
12 + }
13 +
14 + public List<ReadResult> getAsList() {
15 + return readResults;
16 + }
17 +
18 + public int batchSize() {
19 + return readResults.size();
20 + }
21 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.service;
2 +
3 +import java.util.Collections;
4 +import java.util.List;
5 +
6 +import com.google.common.collect.Lists;
7 +
8 +/**
9 + * Collection of write requests to be submitted as one batch.
10 + */
11 +public class BatchWriteRequest {
12 +
13 + private final List<WriteRequest> writeRequests;
14 +
15 + /**
16 + * Creates a new BatchWriteRequest object from the specified list of write requests.
17 + * @param writeRequests write requests.
18 + * @return BatchWriteRequest object.
19 + */
20 + public static BatchWriteRequest create(List<WriteRequest> writeRequests) {
21 + return new BatchWriteRequest(writeRequests);
22 + }
23 +
24 + private BatchWriteRequest(List<WriteRequest> writeRequests) {
25 + this.writeRequests = Collections.unmodifiableList(writeRequests);
26 + }
27 +
28 + /**
29 + * Returns the requests in this batch as a list.
30 + * @return list of write requests
31 + */
32 + public List<WriteRequest> getAsList() {
33 + return writeRequests;
34 + }
35 +
36 + /**
37 + * Returns the number of requests in this batch.
38 + * @return size of request batch.
39 + */
40 + public int batchSize() {
41 + return writeRequests.size();
42 + }
43 +
44 + /**
45 + * Builder for BatchWriteRequest.
46 + */
47 + public static class Builder {
48 +
49 + private final List<WriteRequest> writeRequests = Lists.newLinkedList();
50 +
51 + public Builder put(String tableName, String key, byte[] value) {
52 + writeRequests.add(WriteRequest.put(tableName, key, value));
53 + return this;
54 + }
55 +
56 + public Builder putIfAbsent(String tableName, String key, byte[] value) {
57 + writeRequests.add(WriteRequest.putIfAbsent(tableName, key, value));
58 + return this;
59 + }
60 +
61 + public Builder putIfValueMatches(String tableName, String key, byte[] oldValue, byte[] newValue) {
62 + writeRequests.add(WriteRequest.putIfValueMatches(tableName, key, oldValue, newValue));
63 + return this;
64 + }
65 +
66 + public Builder putIfVersionMatches(String tableName, String key, byte[] value, long version) {
67 + writeRequests.add(WriteRequest.putIfVersionMatches(tableName, key, value, version));
68 + return this;
69 + }
70 +
71 + public Builder remove(String tableName, String key) {
72 + writeRequests.add(WriteRequest.remove(tableName, key));
73 + return this;
74 + }
75 +
76 + public Builder removeIfVersionMatches(String tableName, String key, long version) {
77 + writeRequests.add(WriteRequest.removeIfVersionMatches(tableName, key, version));
78 + return this;
79 + }
80 +
81 + public Builder removeIfValueMatches(String tableName, String key, byte[] value) {
82 + writeRequests.add(WriteRequest.removeIfValueMatches(tableName, key, value));
83 + return this;
84 + }
85 +
86 + public BatchWriteRequest build() {
87 + return new BatchWriteRequest(writeRequests);
88 + }
89 + }
90 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.service;
2 +
3 +import java.util.Collections;
4 +import java.util.List;
5 +
6 +public class BatchWriteResult {
7 +
8 + private final List<WriteResult> writeResults;
9 +
10 + public BatchWriteResult(List<WriteResult> writeResults) {
11 + this.writeResults = Collections.unmodifiableList(writeResults);
12 + }
13 +
14 + public boolean isSuccessful() {
15 + for (WriteResult result : writeResults) {
16 + if (result.status() != WriteStatus.OK) {
17 + return false;
18 + }
19 + }
20 + return true;
21 + }
22 +
23 + public List<WriteResult> getAsList() {
24 + return this.writeResults;
25 + }
26 +
27 + public int batchSize() {
28 + return writeResults.size();
29 + }
30 +}
...\ No newline at end of file ...\ No newline at end of file
1 package org.onlab.onos.store.service; 1 package org.onlab.onos.store.service;
2 2
3 -import java.util.List;
4 -
5 /** 3 /**
6 * Service interface for a strongly consistent and durable 4 * Service interface for a strongly consistent and durable
7 * key value data store. 5 * key value data store.
...@@ -9,46 +7,93 @@ import java.util.List; ...@@ -9,46 +7,93 @@ import java.util.List;
9 public interface DatabaseService { 7 public interface DatabaseService {
10 8
11 /** 9 /**
12 - * Performs a read on the database. 10 + * Reads the specified key.
13 - * @param request read request. 11 + * @param tableName name of the table associated with this operation.
14 - * @return ReadResult 12 + * @return key key to read.
15 - * @throws DatabaseException if there is a failure in executing read. 13 + * @returns value (and version) associated with this key. This calls returns null if the key does not exist.
16 */ 14 */
17 - ReadResult read(ReadRequest request); 15 + VersionedValue get(String tableName, String key);
18 - 16 +
19 /** 17 /**
20 - * Performs a batch read operation on the database. 18 + * Associate the key with a value.
21 - * The main advantage of batch read operation is parallelization. 19 + * @param tableName table name in which this key/value resides.
22 - * @param batch batch of read requests to execute. 20 + * @param key key with which the specified value is to be associated
23 - * @return batch read result. 21 + * @param value value to be associated with the specified key
22 + * @return the previous value associated with the specified key, or null if there was no mapping for the key.
24 */ 23 */
25 - List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch); 24 + VersionedValue put(String tableName, String key, byte[] value);
26 - 25 +
27 - // FIXME Give me a better name
28 /** 26 /**
29 - * Performs a write operation on the database. 27 + * If the specified key is not already associated with a value, associate it with the given value.
30 - * @param request write request 28 + * @param tableName table name in which this key/value resides.
31 - * @return write result. 29 + * @param key key with which the specified value is to be associated
32 - * @throws DatabaseException if there is failure in execution write. 30 + * @param value value to be associated with the specified key
31 + * @return true if put was successful, false if there is already a value associated with this key
33 */ 32 */
34 - OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request); 33 + boolean putIfAbsent(String tableName, String key, byte[] value);
35 - 34 +
36 /** 35 /**
37 - * Performs a write operation on the database. 36 + * Sets the key to the specified value if the version in the database (for that key)
38 - * @param request write request 37 + * matches the specified version.
39 - * @return write result. 38 + * @param tableName name of table associated with this operation.
40 - * @throws OptimisticLockException FIXME define conditional failure 39 + * @param key key
41 - * @throws PreconditionFailedException FIXME define conditional failure 40 + * @param value value
42 - * @throws DatabaseException if there is failure in execution write. 41 + * @param version version that should present in the database for the put to be successful.
42 + * @return true if put was successful, false if there version in database is different from what is specified.
43 */ 43 */
44 - WriteResult write(WriteRequest request)/* throws OptimisticLockException, PreconditionFailedException*/; 44 + boolean putIfVersionMatches(String tableName, String key, byte[] value, long version);
45 - 45 +
46 + /**
47 + * Replaces the entry for a key only if currently mapped to a given value.
48 + * @param tableName name of table associated with this operation.
49 + * @param key with which the specified value is associated
50 + * @param oldValue value expected to be associated with the specified key
51 + * @param newValue value to be associated with the specified key
52 + * @return true if put was successful, false if there version in database is different from what is specified.
53 + */
54 + boolean putIfValueMatches(String tableName, String key, byte[] oldValue, byte[] newValue);
55 +
56 + /**
57 + * Removes the key (and associated value).
58 + * @param tableName name of table associated with this operation.
59 + * @param key key to remove
60 + * @return value previously associated with the key. This call returns null if the key does not exist.
61 + */
62 + VersionedValue remove(String tableName, String key);
63 +
64 + /**
65 + * Removes the key (and associated value) if the version in the database matches specified version.
66 + * @param tableName name of table associated with this operation.
67 + * @param key key to remove
68 + * @param version version that should present in the database for the remove to be successful.
69 + * @return true if remove was successful, false if there version in database is different from what is specified.
70 + */
71 + boolean removeIfVersionMatches(String tableName, String key, long version);
72 +
73 + /**
74 + * Removes the key (and associated value) if the value in the database matches specified value.
75 + * @param tableName name of table associated with this operation.
76 + * @param key key to remove
77 + * @param value value that should present in the database for the remove to be successful.
78 + * @return true if remove was successful, false if there value in database is different from what is specified.
79 + */
80 + boolean removeIfValueMatches(String tableName, String key, byte[] value);
81 +
82 + /**
83 + * Performs a batch read operation and returns the results.
84 + * @param batchRequest batch request.
85 + * @return result of the batch operation.
86 + */
87 + BatchReadResult batchRead(BatchReadRequest batchRequest);
88 +
46 /** 89 /**
47 - * Performs a batch write operation on the database. 90 + * Performs a batch write operation and returns the results.
48 - * Batch write provides transactional semantics. Either all operations 91 + * This method provides transactional semantics. Either all writes succeed or none do.
49 - * succeed or none of them do. 92 + * Even a single write failure would cause the entire batch to be aborted.
50 - * @param batch batch of write requests to execute as a transaction. 93 + * In the case of unsuccessful operation, the batch result can be inspected to determine
51 - * @return result of executing the batch write operation. 94 + * which operation(s) caused the batch to fail.
95 + * @param batchRequest batch request.
96 + * @return result of the batch operation.
52 */ 97 */
53 - List<OptionalResult<WriteResult, DatabaseException>> batchWrite(List<WriteRequest> batch); 98 + BatchWriteResult batchWrite(BatchWriteRequest batchRequest);
54 -} 99 +}
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -9,6 +9,12 @@ package org.onlab.onos.store.service; ...@@ -9,6 +9,12 @@ package org.onlab.onos.store.service;
9 */ 9 */
10 public interface Lock { 10 public interface Lock {
11 11
12 + /**
13 + * Returns the path this lock will be used to guard from concurrent access.
14 + * @return path.
15 + */
16 + String path();
17 +
12 /** 18 /**
13 * Acquires the lock. 19 * Acquires the lock.
14 * If the lock is not available then the caller thread becomes 20 * If the lock is not available then the caller thread becomes
...@@ -26,7 +32,7 @@ public interface Lock { ...@@ -26,7 +32,7 @@ public interface Lock {
26 * already been released by invoking unlock(). Must be in the range 32 * already been released by invoking unlock(). Must be in the range
27 * (0, LockManager.MAX_LEASE_MILLIS] 33 * (0, LockManager.MAX_LEASE_MILLIS]
28 */ 34 */
29 - void lock(long leaseDurationMillis); 35 + void lock(int leaseDurationMillis);
30 36
31 /** 37 /**
32 * Acquires the lock only if it is free at the time of invocation. 38 * Acquires the lock only if it is free at the time of invocation.
...@@ -36,7 +42,7 @@ public interface Lock { ...@@ -36,7 +42,7 @@ public interface Lock {
36 * (0, LockManager.MAX_LEASE_MILLIS] 42 * (0, LockManager.MAX_LEASE_MILLIS]
37 * @return true if the lock was acquired and false otherwise 43 * @return true if the lock was acquired and false otherwise
38 */ 44 */
39 - boolean tryLock(long leaseDurationMillis); 45 + boolean tryLock(int leaseDurationMillis);
40 46
41 /** 47 /**
42 * Acquires the lock if it is free within the given waiting 48 * Acquires the lock if it is free within the given waiting
...@@ -49,7 +55,7 @@ public interface Lock { ...@@ -49,7 +55,7 @@ public interface Lock {
49 * @return true if the lock was acquired and false if the waiting time 55 * @return true if the lock was acquired and false if the waiting time
50 * elapsed before the lock was acquired 56 * elapsed before the lock was acquired
51 */ 57 */
52 - boolean tryLock(long waitTimeMillis, long leaseDurationMillis); 58 + boolean tryLock(long waitTimeMillis, int leaseDurationMillis);
53 59
54 /** 60 /**
55 * Returns true if this Lock instance currently holds the lock. 61 * Returns true if this Lock instance currently holds the lock.
...@@ -72,5 +78,5 @@ public interface Lock { ...@@ -72,5 +78,5 @@ public interface Lock {
72 * @return true if successfully extended expiration, false if attempt to 78 * @return true if successfully extended expiration, false if attempt to
73 * extend expiration fails or if the path is currently not locked by this instance. 79 * extend expiration fails or if the path is currently not locked by this instance.
74 */ 80 */
75 - boolean extendExpiration(long leaseDurationMillis); 81 + boolean extendExpiration(int leaseDurationMillis);
76 } 82 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -75,5 +75,4 @@ public class ReadRequest { ...@@ -75,5 +75,4 @@ public class ReadRequest {
75 return Objects.equals(this.key, other.key) && 75 return Objects.equals(this.key, other.key) &&
76 Objects.equals(this.tableName, other.tableName); 76 Objects.equals(this.tableName, other.tableName);
77 } 77 }
78 -
79 } 78 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -11,12 +11,21 @@ public class ReadResult { ...@@ -11,12 +11,21 @@ public class ReadResult {
11 private final String tableName; 11 private final String tableName;
12 private final String key; 12 private final String key;
13 private final VersionedValue value; 13 private final VersionedValue value;
14 + private final ReadStatus status;
14 15
15 - public ReadResult(String tableName, String key, VersionedValue value) { 16 + public ReadResult(ReadStatus status, String tableName, String key, VersionedValue value) {
17 + this.status = status;
16 this.tableName = tableName; 18 this.tableName = tableName;
17 this.key = key; 19 this.key = key;
18 this.value = value; 20 this.value = value;
19 } 21 }
22 +
23 + /**
24 + * Returns the status of the read operation.
25 + */
26 + public ReadStatus status() {
27 + return status;
28 + }
20 29
21 /** 30 /**
22 * Returns database table name. 31 * Returns database table name.
......
1 +package org.onlab.onos.store.service;
2 +
3 +public enum ReadStatus {
4 + OK,
5 + NO_SUCH_TABLE
6 +}
...@@ -112,7 +112,7 @@ public class WriteRequest { ...@@ -112,7 +112,7 @@ public class WriteRequest {
112 * @param previousVersion previous version expected 112 * @param previousVersion previous version expected
113 * @return WriteRequest 113 * @return WriteRequest
114 */ 114 */
115 - public static WriteRequest remove(String tableName, String key, 115 + public static WriteRequest removeIfVersionMatches(String tableName, String key,
116 long previousVersion) { 116 long previousVersion) {
117 return new WriteRequest(REMOVE_IF_VALUE, tableName, key, 117 return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
118 null, previousVersion, null); 118 null, previousVersion, null);
...@@ -127,7 +127,7 @@ public class WriteRequest { ...@@ -127,7 +127,7 @@ public class WriteRequest {
127 * @param oldValue previous value expected, must not be null 127 * @param oldValue previous value expected, must not be null
128 * @return WriteRequest 128 * @return WriteRequest
129 */ 129 */
130 - public static WriteRequest remove(String tableName, String key, 130 + public static WriteRequest removeIfValueMatches(String tableName, String key,
131 byte[] oldValue) { 131 byte[] oldValue) {
132 return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key, 132 return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
133 null, ANY_VERSION, checkNotNull(oldValue)); 133 null, ANY_VERSION, checkNotNull(oldValue));
......
...@@ -7,34 +7,27 @@ import com.google.common.base.MoreObjects; ...@@ -7,34 +7,27 @@ import com.google.common.base.MoreObjects;
7 * Database write result. 7 * Database write result.
8 */ 8 */
9 public class WriteResult { 9 public class WriteResult {
10 - 10 +
11 - private final String tableName; 11 + private final WriteStatus status;
12 - private final String key;
13 private final VersionedValue previousValue; 12 private final VersionedValue previousValue;
14 - 13 +
15 - public WriteResult(String tableName, String key, VersionedValue previousValue) { 14 + public WriteResult(WriteStatus status, VersionedValue previousValue) {
16 - this.tableName = tableName; 15 + this.status = status;
17 - this.key = key;
18 this.previousValue = previousValue; 16 this.previousValue = previousValue;
19 } 17 }
20 18
21 - public String tableName() {
22 - return tableName;
23 - }
24 -
25 - public String key() {
26 - return key;
27 - }
28 -
29 public VersionedValue previousValue() { 19 public VersionedValue previousValue() {
30 return previousValue; 20 return previousValue;
31 } 21 }
22 +
23 + public WriteStatus status() {
24 + return status;
25 + }
32 26
33 @Override 27 @Override
34 public String toString() { 28 public String toString() {
35 return MoreObjects.toStringHelper(getClass()) 29 return MoreObjects.toStringHelper(getClass())
36 - .add("tableName", tableName) 30 + .add("status", status)
37 - .add("key", key)
38 .add("previousValue", previousValue) 31 .add("previousValue", previousValue)
39 .toString(); 32 .toString();
40 } 33 }
......
1 +package org.onlab.onos.store.service;
2 +
3 +public enum WriteStatus {
4 + OK,
5 + ABORTED,
6 + PRECONDITION_VIOLATION,
7 + NO_SUCH_TABLE,
8 +}
...@@ -103,6 +103,12 @@ ...@@ -103,6 +103,12 @@
103 <artifactId>hazelcast</artifactId> 103 <artifactId>hazelcast</artifactId>
104 </dependency> 104 </dependency>
105 105
106 + <dependency>
107 + <groupId>net.jodah</groupId>
108 + <artifactId>expiringmap</artifactId>
109 + <version>0.3.1</version>
110 + </dependency>
111 +
106 <!-- for shaded copycat --> 112 <!-- for shaded copycat -->
107 <dependency> 113 <dependency>
108 <groupId>org.onlab.onos</groupId> 114 <groupId>org.onlab.onos</groupId>
......
...@@ -8,9 +8,11 @@ import java.util.concurrent.ExecutionException; ...@@ -8,9 +8,11 @@ import java.util.concurrent.ExecutionException;
8 8
9 import net.kuujo.copycat.Copycat; 9 import net.kuujo.copycat.Copycat;
10 10
11 +import org.onlab.onos.store.service.BatchReadRequest;
12 +import org.onlab.onos.store.service.BatchWriteRequest;
11 import org.onlab.onos.store.service.DatabaseException; 13 import org.onlab.onos.store.service.DatabaseException;
12 -import org.onlab.onos.store.service.ReadRequest; 14 +import org.onlab.onos.store.service.ReadResult;
13 -import org.onlab.onos.store.service.WriteRequest; 15 +import org.onlab.onos.store.service.WriteResult;
14 16
15 /** 17 /**
16 * Client for interacting with the Copycat Raft cluster. 18 * Client for interacting with the Copycat Raft cluster.
...@@ -63,9 +65,9 @@ public class DatabaseClient { ...@@ -63,9 +65,9 @@ public class DatabaseClient {
63 } 65 }
64 } 66 }
65 67
66 - public List<InternalReadResult> batchRead(List<ReadRequest> requests) { 68 + public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
67 69
68 - CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests); 70 + CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
69 try { 71 try {
70 return future.get(); 72 return future.get();
71 } catch (InterruptedException | ExecutionException e) { 73 } catch (InterruptedException | ExecutionException e) {
...@@ -73,9 +75,9 @@ public class DatabaseClient { ...@@ -73,9 +75,9 @@ public class DatabaseClient {
73 } 75 }
74 } 76 }
75 77
76 - public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) { 78 + public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
77 79
78 - CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests); 80 + CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
79 try { 81 try {
80 return future.get(); 82 return future.get();
81 } catch (InterruptedException | ExecutionException e) { 83 } catch (InterruptedException | ExecutionException e) {
......
...@@ -4,8 +4,6 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -4,8 +4,6 @@ import static org.slf4j.LoggerFactory.getLogger;
4 4
5 import java.io.File; 5 import java.io.File;
6 import java.io.IOException; 6 import java.io.IOException;
7 -import java.util.ArrayList;
8 -import java.util.Arrays;
9 import java.util.Collection; 7 import java.util.Collection;
10 import java.util.Collections; 8 import java.util.Collections;
11 import java.util.HashSet; 9 import java.util.HashSet;
...@@ -22,7 +20,6 @@ import net.kuujo.copycat.cluster.Member; ...@@ -22,7 +20,6 @@ import net.kuujo.copycat.cluster.Member;
22 import net.kuujo.copycat.cluster.TcpCluster; 20 import net.kuujo.copycat.cluster.TcpCluster;
23 import net.kuujo.copycat.cluster.TcpClusterConfig; 21 import net.kuujo.copycat.cluster.TcpClusterConfig;
24 import net.kuujo.copycat.cluster.TcpMember; 22 import net.kuujo.copycat.cluster.TcpMember;
25 -import net.kuujo.copycat.log.InMemoryLog;
26 import net.kuujo.copycat.log.Log; 23 import net.kuujo.copycat.log.Log;
27 24
28 import org.apache.felix.scr.annotations.Activate; 25 import org.apache.felix.scr.annotations.Activate;
...@@ -37,17 +34,18 @@ import org.onlab.onos.cluster.ClusterService; ...@@ -37,17 +34,18 @@ import org.onlab.onos.cluster.ClusterService;
37 import org.onlab.onos.cluster.ControllerNode; 34 import org.onlab.onos.cluster.ControllerNode;
38 import org.onlab.onos.cluster.DefaultControllerNode; 35 import org.onlab.onos.cluster.DefaultControllerNode;
39 import org.onlab.onos.cluster.NodeId; 36 import org.onlab.onos.cluster.NodeId;
37 +import org.onlab.onos.store.service.BatchReadRequest;
38 +import org.onlab.onos.store.service.BatchReadResult;
39 +import org.onlab.onos.store.service.BatchWriteRequest;
40 +import org.onlab.onos.store.service.BatchWriteResult;
40 import org.onlab.onos.store.service.DatabaseAdminService; 41 import org.onlab.onos.store.service.DatabaseAdminService;
41 import org.onlab.onos.store.service.DatabaseException; 42 import org.onlab.onos.store.service.DatabaseException;
42 import org.onlab.onos.store.service.DatabaseService; 43 import org.onlab.onos.store.service.DatabaseService;
43 -import org.onlab.onos.store.service.NoSuchTableException;
44 -import org.onlab.onos.store.service.OptimisticLockException;
45 -import org.onlab.onos.store.service.OptionalResult;
46 -import org.onlab.onos.store.service.ReadRequest;
47 import org.onlab.onos.store.service.ReadResult; 44 import org.onlab.onos.store.service.ReadResult;
48 -import org.onlab.onos.store.service.WriteAborted; 45 +import org.onlab.onos.store.service.ReadStatus;
49 -import org.onlab.onos.store.service.WriteRequest; 46 +import org.onlab.onos.store.service.VersionedValue;
50 import org.onlab.onos.store.service.WriteResult; 47 import org.onlab.onos.store.service.WriteResult;
48 +import org.onlab.onos.store.service.WriteStatus;
51 import org.onlab.packet.IpAddress; 49 import org.onlab.packet.IpAddress;
52 import org.slf4j.Logger; 50 import org.slf4j.Logger;
53 51
...@@ -199,66 +197,121 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -199,66 +197,121 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
199 } 197 }
200 198
201 @Override 199 @Override
202 - public ReadResult read(ReadRequest request) { 200 + public VersionedValue get(String tableName, String key) {
203 - return batchRead(Arrays.asList(request)).get(0).get(); 201 + BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
202 + ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
203 + if (readResult.status().equals(ReadStatus.OK)) {
204 + return readResult.value();
205 + }
206 + throw new DatabaseException("get failed due to status: " + readResult.status());
204 } 207 }
205 208
206 @Override 209 @Override
207 - public List<OptionalResult<ReadResult, DatabaseException>> batchRead( 210 + public BatchReadResult batchRead(BatchReadRequest batchRequest) {
208 - List<ReadRequest> batch) { 211 + return new BatchReadResult(client.batchRead(batchRequest));
209 - List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size()); 212 + }
210 - for (InternalReadResult internalReadResult : client.batchRead(batch)) { 213 +
211 - if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) { 214 + @Override
212 - readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>( 215 + public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
213 - new NoSuchTableException())); 216 + return new BatchWriteResult(client.batchWrite(batchRequest));
214 - } else { 217 + }
215 - readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>( 218 +
216 - internalReadResult.result())); 219 + @Override
217 - } 220 + public VersionedValue put(String tableName, String key, byte[] value) {
221 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
222 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
223 + if (writeResult.status().equals(WriteStatus.OK)) {
224 + return writeResult.previousValue();
218 } 225 }
219 - return readResults; 226 + throw new DatabaseException("put failed due to status: " + writeResult.status());
220 } 227 }
221 228
222 @Override 229 @Override
223 - public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) { 230 + public boolean putIfAbsent(String tableName, String key, byte[] value) {
224 - return batchWrite(Arrays.asList(request)).get(0); 231 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
232 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
233 + if (writeResult.status().equals(WriteStatus.OK)) {
234 + return true;
235 + } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
236 + return false;
237 + }
238 + throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
225 } 239 }
226 240
227 @Override 241 @Override
228 - public WriteResult write(WriteRequest request) { 242 + public boolean putIfVersionMatches(String tableName, String key,
229 -// throws OptimisticLockException, PreconditionFailedException { 243 + byte[] value, long version) {
230 - return writeNothrow(request).get(); 244 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
245 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
246 + if (writeResult.status().equals(WriteStatus.OK)) {
247 + return true;
248 + } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
249 + return false;
250 + }
251 + throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
231 } 252 }
232 253
233 @Override 254 @Override
234 - public List<OptionalResult<WriteResult, DatabaseException>> batchWrite( 255 + public boolean putIfValueMatches(String tableName, String key,
235 - List<WriteRequest> batch) { 256 + byte[] oldValue, byte[] newValue) {
236 - List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size()); 257 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
237 - for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) { 258 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
238 - if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) { 259 + if (writeResult.status().equals(WriteStatus.OK)) {
239 - writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>( 260 + return true;
240 - new NoSuchTableException())); 261 + } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
241 - } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) { 262 + return false;
242 - writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
243 - new OptimisticLockException()));
244 - } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
245 - // TODO: throw a different exception?
246 - writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
247 - new OptimisticLockException()));
248 - } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
249 - writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
250 - new WriteAborted()));
251 - } else {
252 - writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
253 - internalWriteResult.result()));
254 - }
255 } 263 }
256 - return writeResults; 264 + throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
265 + }
257 266
267 + @Override
268 + public VersionedValue remove(String tableName, String key) {
269 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
270 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
271 + if (writeResult.status().equals(WriteStatus.OK)) {
272 + return writeResult.previousValue();
273 + }
274 + throw new DatabaseException("remove failed due to status: " + writeResult.status());
275 + }
276 +
277 + @Override
278 + public boolean removeIfVersionMatches(String tableName, String key,
279 + long version) {
280 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
281 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
282 + if (writeResult.status().equals(WriteStatus.OK)) {
283 + return true;
284 + } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
285 + return false;
286 + }
287 + throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
288 + }
289 +
290 + @Override
291 + public boolean removeIfValueMatches(String tableName, String key,
292 + byte[] value) {
293 + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
294 + WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
295 + if (writeResult.status().equals(WriteStatus.OK)) {
296 + return true;
297 + } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
298 + return false;
299 + }
300 + throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
301 + }
302 +
303 + @Override
304 + public void addMember(final ControllerNode node) {
305 + final TcpMember tcpMember = new TcpMember(node.ip().toString(),
306 + node.tcpPort());
307 + log.info("{} was added to the cluster", tcpMember);
308 + synchronized (clusterConfig) {
309 + clusterConfig.addRemoteMember(tcpMember);
310 + }
258 } 311 }
259 312
260 private final class InternalClusterEventListener 313 private final class InternalClusterEventListener
261 - implements ClusterEventListener { 314 + implements ClusterEventListener {
262 315
263 @Override 316 @Override
264 public void event(ClusterEvent event) { 317 public void event(ClusterEvent event) {
...@@ -266,7 +319,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -266,7 +319,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
266 319
267 final ControllerNode node = event.subject(); 320 final ControllerNode node = event.subject();
268 final TcpMember tcpMember = new TcpMember(node.ip().toString(), 321 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
269 - node.tcpPort()); 322 + node.tcpPort());
270 323
271 switch (event.type()) { 324 switch (event.type()) {
272 case INSTANCE_ACTIVATED: 325 case INSTANCE_ACTIVATED:
...@@ -284,8 +337,8 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -284,8 +337,8 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
284 case INSTANCE_REMOVED: 337 case INSTANCE_REMOVED:
285 if (autoAddMember) { 338 if (autoAddMember) {
286 Set<DefaultControllerNode> members 339 Set<DefaultControllerNode> members
287 - = tabletMembers.getOrDefault(DEFAULT_TABLET, 340 + = tabletMembers.getOrDefault(DEFAULT_TABLET,
288 - Collections.emptySet()); 341 + Collections.emptySet());
289 // remove only if not the initial members 342 // remove only if not the initial members
290 if (!members.contains(node)) { 343 if (!members.contains(node)) {
291 synchronized (clusterConfig) { 344 synchronized (clusterConfig) {
...@@ -308,63 +361,6 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -308,63 +361,6 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
308 361
309 } 362 }
310 363
311 - public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
312 - public KryoRegisteredInMemoryLog() {
313 - super();
314 - // required to deserialize object across bundles
315 - super.kryo.register(TcpMember.class, new TcpMemberSerializer());
316 - super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
317 - }
318 - }
319 -
320 - private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
321 -
322 - private final R result;
323 - private final DatabaseException exception;
324 -
325 - public DatabaseOperationResult(R result) {
326 - this.result = result;
327 - this.exception = null;
328 - }
329 -
330 - public DatabaseOperationResult(DatabaseException exception) {
331 - this.result = null;
332 - this.exception = exception;
333 - }
334 -
335 - @Override
336 - public R get() {
337 - if (result != null) {
338 - return result;
339 - }
340 - throw exception;
341 - }
342 -
343 - @Override
344 - public boolean hasValidResult() {
345 - return result != null;
346 - }
347 -
348 - @Override
349 - public String toString() {
350 - if (result != null) {
351 - return result.toString();
352 - } else {
353 - return exception.toString();
354 - }
355 - }
356 - }
357 -
358 - @Override
359 - public void addMember(final ControllerNode node) {
360 - final TcpMember tcpMember = new TcpMember(node.ip().toString(),
361 - node.tcpPort());
362 - log.info("{} was added to the cluster", tcpMember);
363 - synchronized (clusterConfig) {
364 - clusterConfig.addRemoteMember(tcpMember);
365 - }
366 - }
367 -
368 @Override 364 @Override
369 public void removeMember(final ControllerNode node) { 365 public void removeMember(final ControllerNode node) {
370 final TcpMember tcpMember = new TcpMember(node.ip().toString(), 366 final TcpMember tcpMember = new TcpMember(node.ip().toString(),
......
...@@ -8,6 +8,7 @@ import java.util.ArrayList; ...@@ -8,6 +8,7 @@ import java.util.ArrayList;
8 import java.util.Arrays; 8 import java.util.Arrays;
9 import java.util.List; 9 import java.util.List;
10 import java.util.Map; 10 import java.util.Map;
11 +import java.util.Set;
11 import java.util.zip.DeflaterOutputStream; 12 import java.util.zip.DeflaterOutputStream;
12 import java.util.zip.InflaterInputStream; 13 import java.util.zip.InflaterInputStream;
13 14
...@@ -15,16 +16,21 @@ import net.kuujo.copycat.Command; ...@@ -15,16 +16,21 @@ import net.kuujo.copycat.Command;
15 import net.kuujo.copycat.Query; 16 import net.kuujo.copycat.Query;
16 import net.kuujo.copycat.StateMachine; 17 import net.kuujo.copycat.StateMachine;
17 18
19 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
18 import org.onlab.onos.store.serializers.KryoSerializer; 20 import org.onlab.onos.store.serializers.KryoSerializer;
21 +import org.onlab.onos.store.service.BatchReadRequest;
22 +import org.onlab.onos.store.service.BatchWriteRequest;
19 import org.onlab.onos.store.service.ReadRequest; 23 import org.onlab.onos.store.service.ReadRequest;
20 import org.onlab.onos.store.service.ReadResult; 24 import org.onlab.onos.store.service.ReadResult;
25 +import org.onlab.onos.store.service.ReadStatus;
21 import org.onlab.onos.store.service.VersionedValue; 26 import org.onlab.onos.store.service.VersionedValue;
22 import org.onlab.onos.store.service.WriteRequest; 27 import org.onlab.onos.store.service.WriteRequest;
23 import org.onlab.onos.store.service.WriteResult; 28 import org.onlab.onos.store.service.WriteResult;
24 -import org.onlab.onos.store.service.impl.InternalWriteResult.Status; 29 +import org.onlab.onos.store.service.WriteStatus;
25 import org.onlab.util.KryoNamespace; 30 import org.onlab.util.KryoNamespace;
26 import org.slf4j.Logger; 31 import org.slf4j.Logger;
27 32
33 +import com.beust.jcommander.internal.Lists;
28 import com.google.common.collect.ImmutableList; 34 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.Maps; 35 import com.google.common.collect.Maps;
30 import com.google.common.io.ByteStreams; 36 import com.google.common.io.ByteStreams;
...@@ -40,6 +46,10 @@ public class DatabaseStateMachine implements StateMachine { ...@@ -40,6 +46,10 @@ public class DatabaseStateMachine implements StateMachine {
40 46
41 private final Logger log = getLogger(getClass()); 47 private final Logger log = getLogger(getClass());
42 48
49 + // message subject for database update notifications.
50 + public static MessageSubject DATABASE_UPDATE_EVENTS =
51 + new MessageSubject("database-update-events");
52 +
43 // serializer used for snapshot 53 // serializer used for snapshot
44 public static final KryoSerializer SERIALIZER = new KryoSerializer() { 54 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
45 @Override 55 @Override
...@@ -47,29 +57,72 @@ public class DatabaseStateMachine implements StateMachine { ...@@ -47,29 +57,72 @@ public class DatabaseStateMachine implements StateMachine {
47 serializerPool = KryoNamespace.newBuilder() 57 serializerPool = KryoNamespace.newBuilder()
48 .register(VersionedValue.class) 58 .register(VersionedValue.class)
49 .register(State.class) 59 .register(State.class)
60 + .register(BatchReadRequest.class)
61 + .register(BatchWriteRequest.class)
62 + .register(ReadStatus.class)
63 + .register(WriteStatus.class)
64 + // TODO: Move this out ?
65 + .register(TableModificationEvent.class)
50 .register(ClusterMessagingProtocol.COMMON) 66 .register(ClusterMessagingProtocol.COMMON)
51 .build() 67 .build()
52 .populate(1); 68 .populate(1);
53 } 69 }
54 }; 70 };
55 71
72 + private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
73 +
74 + // durable internal state of the database.
56 private State state = new State(); 75 private State state = new State();
57 76
58 private boolean compressSnapshot = false; 77 private boolean compressSnapshot = false;
59 78
60 @Command 79 @Command
61 public boolean createTable(String tableName) { 80 public boolean createTable(String tableName) {
62 - return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null; 81 + Map<String, VersionedValue> existingTable =
82 + state.getTables().putIfAbsent(tableName, Maps.newHashMap());
83 + if (existingTable == null) {
84 + for (DatabaseUpdateEventListener listener : listeners) {
85 + listener.tableCreated(tableName, Integer.MAX_VALUE);
86 + }
87 + return true;
88 + }
89 + return false;
90 + }
91 +
92 + @Command
93 + public boolean createTable(String tableName, int expirationTimeMillis) {
94 + Map<String, VersionedValue> existingTable =
95 + state.getTables().putIfAbsent(tableName, Maps.newHashMap());
96 + if (existingTable == null) {
97 + for (DatabaseUpdateEventListener listener : listeners) {
98 + listener.tableCreated(tableName, expirationTimeMillis);
99 + }
100 + return true;
101 + }
102 + return false;
63 } 103 }
64 104
65 @Command 105 @Command
66 public boolean dropTable(String tableName) { 106 public boolean dropTable(String tableName) {
67 - return state.getTables().remove(tableName) != null; 107 + Map<String, VersionedValue> table = state.getTables().remove(tableName);
108 + if (table != null) {
109 + for (DatabaseUpdateEventListener listener : listeners) {
110 + listener.tableDeleted(tableName);
111 + }
112 + return true;
113 + }
114 + return false;
68 } 115 }
69 116
70 @Command 117 @Command
71 public boolean dropAllTables() { 118 public boolean dropAllTables() {
119 + Set<String> tableNames = state.getTables().keySet();
72 state.getTables().clear(); 120 state.getTables().clear();
121 + for (DatabaseUpdateEventListener listener : listeners) {
122 + for (String tableName : tableNames) {
123 + listener.tableDeleted(tableName);
124 + }
125 + }
73 return true; 126 return true;
74 } 127 }
75 128
...@@ -79,96 +132,95 @@ public class DatabaseStateMachine implements StateMachine { ...@@ -79,96 +132,95 @@ public class DatabaseStateMachine implements StateMachine {
79 } 132 }
80 133
81 @Query 134 @Query
82 - public List<InternalReadResult> read(List<ReadRequest> requests) { 135 + public List<ReadResult> read(BatchReadRequest batchRequest) {
83 - List<InternalReadResult> results = new ArrayList<>(requests.size()); 136 + List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
84 - for (ReadRequest request : requests) { 137 + for (ReadRequest request : batchRequest.getAsList()) {
85 Map<String, VersionedValue> table = state.getTables().get(request.tableName()); 138 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
86 if (table == null) { 139 if (table == null) {
87 - results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null)); 140 + results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
88 continue; 141 continue;
89 } 142 }
90 VersionedValue value = VersionedValue.copy(table.get(request.key())); 143 VersionedValue value = VersionedValue.copy(table.get(request.key()));
91 - results.add(new InternalReadResult( 144 + results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
92 - InternalReadResult.Status.OK,
93 - new ReadResult(
94 - request.tableName(),
95 - request.key(),
96 - value)));
97 } 145 }
98 return results; 146 return results;
99 } 147 }
100 148
101 - InternalWriteResult.Status checkIfApplicable(WriteRequest request, 149 + WriteStatus checkIfApplicable(WriteRequest request,
102 - VersionedValue value) { 150 + VersionedValue value) {
103 151
104 switch (request.type()) { 152 switch (request.type()) {
105 case PUT: 153 case PUT:
106 - return InternalWriteResult.Status.OK; 154 + return WriteStatus.OK;
107 155
108 case PUT_IF_ABSENT: 156 case PUT_IF_ABSENT:
109 if (value == null) { 157 if (value == null) {
110 - return InternalWriteResult.Status.OK; 158 + return WriteStatus.OK;
111 } 159 }
112 - return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH; 160 + return WriteStatus.PRECONDITION_VIOLATION;
113 case PUT_IF_VALUE: 161 case PUT_IF_VALUE:
114 case REMOVE_IF_VALUE: 162 case REMOVE_IF_VALUE:
115 if (value != null && Arrays.equals(value.value(), request.oldValue())) { 163 if (value != null && Arrays.equals(value.value(), request.oldValue())) {
116 - return InternalWriteResult.Status.OK; 164 + return WriteStatus.OK;
117 } 165 }
118 - return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH; 166 + return WriteStatus.PRECONDITION_VIOLATION;
119 case PUT_IF_VERSION: 167 case PUT_IF_VERSION:
120 case REMOVE_IF_VERSION: 168 case REMOVE_IF_VERSION:
121 if (value != null && request.previousVersion() == value.version()) { 169 if (value != null && request.previousVersion() == value.version()) {
122 - return InternalWriteResult.Status.OK; 170 + return WriteStatus.OK;
123 } 171 }
124 - return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH; 172 + return WriteStatus.PRECONDITION_VIOLATION;
125 case REMOVE: 173 case REMOVE:
126 - return InternalWriteResult.Status.OK; 174 + return WriteStatus.OK;
127 default: 175 default:
128 break; 176 break;
129 } 177 }
130 log.error("Should never reach here {}", request); 178 log.error("Should never reach here {}", request);
131 - return InternalWriteResult.Status.ABORTED; 179 + return WriteStatus.ABORTED;
132 } 180 }
133 181
134 @Command 182 @Command
135 - public List<InternalWriteResult> write(List<WriteRequest> requests) { 183 + public List<WriteResult> write(BatchWriteRequest batchRequest) {
136 184
137 // applicability check 185 // applicability check
138 boolean abort = false; 186 boolean abort = false;
139 - List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size()); 187 + List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
140 - for (WriteRequest request : requests) { 188 + for (WriteRequest request : batchRequest.getAsList()) {
141 Map<String, VersionedValue> table = state.getTables().get(request.tableName()); 189 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
142 if (table == null) { 190 if (table == null) {
143 - validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE); 191 + validationResults.add(WriteStatus.NO_SUCH_TABLE);
144 abort = true; 192 abort = true;
145 continue; 193 continue;
146 } 194 }
147 final VersionedValue value = table.get(request.key()); 195 final VersionedValue value = table.get(request.key());
148 - Status result = checkIfApplicable(request, value); 196 + WriteStatus result = checkIfApplicable(request, value);
149 validationResults.add(result); 197 validationResults.add(result);
150 - if (result != Status.OK) { 198 + if (result != WriteStatus.OK) {
151 abort = true; 199 abort = true;
152 } 200 }
153 } 201 }
154 202
155 - List<InternalWriteResult> results = new ArrayList<>(requests.size()); 203 + List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
156 204
157 if (abort) { 205 if (abort) {
158 - for (InternalWriteResult.Status validationResult : validationResults) { 206 + for (WriteStatus validationResult : validationResults) {
159 - if (validationResult == InternalWriteResult.Status.OK) { 207 + if (validationResult == WriteStatus.OK) {
160 // aborted due to applicability check failure on other request 208 // aborted due to applicability check failure on other request
161 - results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null)); 209 + results.add(new WriteResult(WriteStatus.ABORTED, null));
162 } else { 210 } else {
163 - results.add(new InternalWriteResult(validationResult, null)); 211 + results.add(new WriteResult(validationResult, null));
164 } 212 }
165 } 213 }
166 return results; 214 return results;
167 } 215 }
168 216
217 + List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
218 +
169 // apply changes 219 // apply changes
170 - for (WriteRequest request : requests) { 220 + for (WriteRequest request : batchRequest.getAsList()) {
171 Map<String, VersionedValue> table = state.getTables().get(request.tableName()); 221 Map<String, VersionedValue> table = state.getTables().get(request.tableName());
222 +
223 + TableModificationEvent tableModificationEvent = null;
172 // FIXME: If this method could be called by multiple thread, 224 // FIXME: If this method could be called by multiple thread,
173 // synchronization scope is wrong. 225 // synchronization scope is wrong.
174 // Whole function including applicability check needs to be protected. 226 // Whole function including applicability check needs to be protected.
...@@ -182,16 +234,23 @@ public class DatabaseStateMachine implements StateMachine { ...@@ -182,16 +234,23 @@ public class DatabaseStateMachine implements StateMachine {
182 case PUT_IF_VERSION: 234 case PUT_IF_VERSION:
183 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion()); 235 VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
184 VersionedValue previousValue = table.put(request.key(), newValue); 236 VersionedValue previousValue = table.put(request.key(), newValue);
185 - WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue); 237 + WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
186 - results.add(InternalWriteResult.ok(putResult)); 238 + results.add(putResult);
239 + tableModificationEvent = (previousValue == null) ?
240 + TableModificationEvent.rowAdded(request.tableName(), request.key()) :
241 + TableModificationEvent.rowUpdated(request.tableName(), request.key());
187 break; 242 break;
188 243
189 case REMOVE: 244 case REMOVE:
190 case REMOVE_IF_VALUE: 245 case REMOVE_IF_VALUE:
191 case REMOVE_IF_VERSION: 246 case REMOVE_IF_VERSION:
192 VersionedValue removedValue = table.remove(request.key()); 247 VersionedValue removedValue = table.remove(request.key());
193 - WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue); 248 + WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
194 - results.add(InternalWriteResult.ok(removeResult)); 249 + results.add(removeResult);
250 + if (removedValue != null) {
251 + tableModificationEvent =
252 + TableModificationEvent.rowDeleted(request.tableName(), request.key());
253 + }
195 break; 254 break;
196 255
197 default: 256 default:
...@@ -199,7 +258,19 @@ public class DatabaseStateMachine implements StateMachine { ...@@ -199,7 +258,19 @@ public class DatabaseStateMachine implements StateMachine {
199 break; 258 break;
200 } 259 }
201 } 260 }
261 +
262 + if (tableModificationEvent != null) {
263 + tableModificationEvents.add(tableModificationEvent);
264 + }
202 } 265 }
266 +
267 + // notify listeners of table mod events.
268 + for (DatabaseUpdateEventListener listener : listeners) {
269 + for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
270 + listener.tableModified(tableModificationEvent);
271 + }
272 + }
273 +
203 return results; 274 return results;
204 } 275 }
205 276
...@@ -253,4 +324,8 @@ public class DatabaseStateMachine implements StateMachine { ...@@ -253,4 +324,8 @@ public class DatabaseStateMachine implements StateMachine {
253 throw new SnapshotException(e); 324 throw new SnapshotException(e);
254 } 325 }
255 } 326 }
327 +
328 + public void addEventListener(DatabaseUpdateEventListener listener) {
329 + listeners.add(listener);
330 + }
256 } 331 }
......
1 +/*
2 + * Copyright 2014 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onlab.onos.store.service.impl;
18 +
19 +import java.io.IOException;
20 +import java.util.HashMap;
21 +import java.util.Map;
22 +import java.util.Objects;
23 +import java.util.concurrent.TimeUnit;
24 +import java.util.concurrent.atomic.AtomicBoolean;
25 +
26 +import net.jodah.expiringmap.ExpiringMap;
27 +import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
28 +import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
29 +import net.kuujo.copycat.cluster.Member;
30 +import net.kuujo.copycat.event.EventHandler;
31 +import net.kuujo.copycat.event.LeaderElectEvent;
32 +
33 +import org.onlab.onos.cluster.ClusterService;
34 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
35 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
36 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
37 +import org.onlab.onos.store.service.DatabaseService;
38 +import org.slf4j.Logger;
39 +import org.slf4j.LoggerFactory;
40 +
41 +public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
42 +
43 + private final Logger log = LoggerFactory.getLogger(getClass());
44 +
45 + public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event");
46 +
47 + private DatabaseService databaseService;
48 + private ClusterService cluster;
49 + private ClusterCommunicationService clusterCommunicator;
50 +
51 + private final Member localMember;
52 + private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
53 + private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
54 + private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
55 +
56 + DatabaseUpdateEventHandler(Member localMember) {
57 + this.localMember = localMember;
58 + }
59 +
60 + @Override
61 + public void tableModified(TableModificationEvent event) {
62 + DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
63 + Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
64 +
65 + switch (event.type()) {
66 + case ROW_DELETED:
67 + if (isLocalMemberLeader.get()) {
68 + try {
69 + clusterCommunicator.broadcast(
70 + new ClusterMessage(
71 + cluster.getLocalNode().id(),
72 + DATABASE_UPDATES,
73 + DatabaseStateMachine.SERIALIZER.encode(event)));
74 + } catch (IOException e) {
75 + log.error("Failed to broadcast a database table modification event.", e);
76 + }
77 + }
78 + break;
79 + case ROW_ADDED:
80 + case ROW_UPDATED:
81 + map.put(row, null);
82 + break;
83 + default:
84 + break;
85 + }
86 + }
87 +
88 + @Override
89 + public void tableCreated(String tableName, int expirationTimeMillis) {
90 + // make this explicit instead of relying on a negative value
91 + // to indicate no expiration.
92 + if (expirationTimeMillis > 0) {
93 + tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
94 + .expiration(expirationTimeMillis, TimeUnit.SECONDS)
95 + .expirationListener(expirationObserver)
96 + // FIXME: make the expiration policy configurable.
97 + .expirationPolicy(ExpirationPolicy.CREATED)
98 + .build());
99 + }
100 + }
101 +
102 + @Override
103 + public void tableDeleted(String tableName) {
104 + tableEntryExpirationMap.remove(tableName);
105 + }
106 +
107 + private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
108 + @Override
109 + public void expired(DatabaseRow key, Void value) {
110 + try {
111 + // TODO: The safety of this check needs to be verified.
112 + // Couple of issues:
113 + // 1. It is very likely that only one member should attempt deletion of the entry from database.
114 + // 2. A potential race condition exists where the entry expires, but before its can be deleted
115 + // from the database, a new entry is added or existing entry is updated.
116 + // That means ttl and expiration should be for a given version.
117 + if (isLocalMemberLeader.get()) {
118 + databaseService.remove(key.tableName, key.key);
119 + }
120 + } catch (Exception e) {
121 + log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
122 + tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
123 + }
124 + }
125 + }
126 +
127 + @Override
128 + public void handle(LeaderElectEvent event) {
129 + if (localMember.equals(event.leader())) {
130 + isLocalMemberLeader.set(true);
131 + }
132 + }
133 +
134 + private class DatabaseRow {
135 +
136 + String tableName;
137 + String key;
138 +
139 + public DatabaseRow(String tableName, String key) {
140 + this.tableName = tableName;
141 + this.key = key;
142 + }
143 +
144 + @Override
145 + public boolean equals(Object obj) {
146 + if (this == obj) {
147 + return true;
148 + }
149 + if (!(obj instanceof DatabaseRow)) {
150 + return false;
151 + }
152 + DatabaseRow that = (DatabaseRow) obj;
153 +
154 + return Objects.equals(this.tableName, that.tableName) &&
155 + Objects.equals(this.key, that.key);
156 + }
157 +
158 + @Override
159 + public int hashCode() {
160 + return Objects.hash(tableName, key);
161 + }
162 + }
163 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2014 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onlab.onos.store.service.impl;
18 +
19 +public interface DatabaseUpdateEventListener {
20 +
21 + /**
22 + *
23 + * @param event
24 + */
25 + public void tableModified(TableModificationEvent event);
26 +
27 + /**
28 + *
29 + * @param tableName
30 + * @param expirationTimeMillis
31 + */
32 + public void tableCreated(String tableName, int expirationTimeMillis);
33 +
34 + /**
35 + *
36 + * @param tableName
37 + */
38 + public void tableDeleted(String tableName);
39 +
40 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.service.impl;
2 +
3 +import java.util.UUID;
4 +import java.util.concurrent.CompletableFuture;
5 +import java.util.concurrent.ExecutionException;
6 +import java.util.concurrent.TimeUnit;
7 +import java.util.concurrent.TimeoutException;
8 +import java.util.concurrent.atomic.AtomicBoolean;
9 +
10 +import org.joda.time.DateTime;
11 +import org.onlab.onos.cluster.ClusterService;
12 +import org.onlab.onos.store.service.DatabaseService;
13 +import org.onlab.onos.store.service.Lock;
14 +import org.onlab.onos.store.service.OptimisticLockException;
15 +
16 +/**
17 + * A distributed lock implementation.
18 + */
19 +public class DistributedLock implements Lock {
20 +
21 + private final DistributedLockManager lockManager;
22 + private final DatabaseService databaseService;
23 + private final String path;
24 + private DateTime lockExpirationTime;
25 + private AtomicBoolean isLocked = new AtomicBoolean(false);
26 + private byte[] lockId;
27 +
28 + public DistributedLock(
29 + String path,
30 + DatabaseService databaseService,
31 + ClusterService clusterService,
32 + DistributedLockManager lockManager) {
33 +
34 + this.path = path;
35 + this.databaseService = databaseService;
36 + this.lockManager = lockManager;
37 + this.lockId =
38 + (UUID.randomUUID().toString() + "::" + clusterService.getLocalNode().id().toString()).getBytes();
39 + }
40 +
41 + @Override
42 + public String path() {
43 + return path;
44 + }
45 +
46 + @Override
47 + public void lock(int leaseDurationMillis) {
48 +
49 + if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
50 + // Nothing to do.
51 + // Current expiration time is beyond what is requested.
52 + return;
53 + } else {
54 + tryLock(Long.MAX_VALUE, leaseDurationMillis);
55 + }
56 + }
57 +
58 + @Override
59 + public boolean tryLock(int leaseDurationMillis) {
60 + try {
61 + databaseService.putIfAbsent(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
62 + return true;
63 + } catch (OptimisticLockException e) {
64 + return false;
65 + }
66 + }
67 +
68 + @Override
69 + public boolean tryLock(
70 + long waitTimeMillis,
71 + int leaseDurationMillis) {
72 + if (tryLock(leaseDurationMillis) == false) {
73 + CompletableFuture<Void> future =
74 + lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
75 + try {
76 + future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
77 + } catch (ExecutionException | InterruptedException e) {
78 + // TODO: ExecutionException could indicate something
79 + // wrong with the backing database.
80 + // Throw an exception?
81 + return false;
82 + } catch (TimeoutException e) {
83 + return false;
84 + }
85 + }
86 + lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
87 + return true;
88 + }
89 +
90 + @Override
91 + public boolean isLocked() {
92 + if (isLocked.get()) {
93 + // We rely on local information to check
94 + // if the expired.
95 + // This should should make this call
96 + // light weight, which still retaining the same
97 + // safety guarantees.
98 + if (DateTime.now().isAfter(lockExpirationTime)) {
99 + isLocked.set(false);
100 + return false;
101 + }
102 + }
103 + return true;
104 + }
105 +
106 + @Override
107 + public void unlock() {
108 + if (!isLocked()) {
109 + return;
110 + } else {
111 + databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
112 + }
113 + }
114 +
115 + @Override
116 + public boolean extendExpiration(int leaseDurationMillis) {
117 + if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
118 + return true;
119 + } else {
120 + return tryLock(leaseDurationMillis);
121 + }
122 + }
123 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.service.impl;
2 +
3 +import static org.slf4j.LoggerFactory.getLogger;
4 +
5 +import java.util.Iterator;
6 +import java.util.List;
7 +import java.util.concurrent.CompletableFuture;
8 +
9 +import org.apache.felix.scr.annotations.Activate;
10 +import org.apache.felix.scr.annotations.Component;
11 +import org.apache.felix.scr.annotations.Deactivate;
12 +import org.apache.felix.scr.annotations.Reference;
13 +import org.apache.felix.scr.annotations.ReferenceCardinality;
14 +import org.apache.felix.scr.annotations.Service;
15 +import org.joda.time.DateTime;
16 +import org.onlab.onos.cluster.ClusterService;
17 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19 +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
20 +import org.onlab.onos.store.service.DatabaseService;
21 +import org.onlab.onos.store.service.Lock;
22 +import org.onlab.onos.store.service.LockEventListener;
23 +import org.onlab.onos.store.service.LockService;
24 +import org.slf4j.Logger;
25 +
26 +import com.google.common.collect.ArrayListMultimap;
27 +
28 +@Component(immediate = true)
29 +@Service
30 +public class DistributedLockManager implements LockService {
31 +
32 + private final Logger log = getLogger(getClass());
33 +
34 + public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
35 +
36 + private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
37 +
38 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
39 + private ClusterCommunicationService clusterCommunicator;
40 +
41 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
42 + private DatabaseService databaseService;
43 +
44 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
45 + private ClusterService clusterService;
46 +
47 + @Activate
48 + public void activate() {
49 + clusterCommunicator.addSubscriber(
50 + DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
51 + new LockEventMessageListener());
52 + log.info("Started.");
53 +
54 + }
55 +
56 + @Deactivate
57 + public void deactivate() {
58 + locksToAcquire.clear();
59 + log.info("Started.");
60 + }
61 +
62 + @Override
63 + public Lock create(String path) {
64 + return new DistributedLock(
65 + path,
66 + databaseService,
67 + clusterService,
68 + this);
69 + }
70 +
71 + @Override
72 + public void addListener(LockEventListener listener) {
73 + // FIXME:
74 + throw new UnsupportedOperationException();
75 + }
76 +
77 + @Override
78 + public void removeListener(LockEventListener listener) {
79 + // FIXME:
80 + throw new UnsupportedOperationException();
81 + }
82 +
83 + protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
84 + CompletableFuture<Void> future = new CompletableFuture<>();
85 + locksToAcquire.put(
86 + lock.path(),
87 + new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
88 + return future;
89 + }
90 +
91 + private class LockEventMessageListener implements ClusterMessageHandler {
92 + @Override
93 + public void handle(ClusterMessage message) {
94 + TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
95 + if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
96 + return;
97 + }
98 +
99 + String path = event.key();
100 + if (!locksToAcquire.containsKey(path)) {
101 + return;
102 + }
103 +
104 + if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
105 + List<LockRequest> existingRequests = locksToAcquire.get(path);
106 + if (existingRequests == null) return;
107 +
108 + Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
109 + while (existingRequestIterator.hasNext()) {
110 + LockRequest request = existingRequestIterator.next();
111 + if (request.expirationTime().isAfter(DateTime.now())) {
112 + existingRequestIterator.remove();
113 + } else {
114 + if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
115 + request.future().complete(null);
116 + existingRequests.remove(0);
117 + }
118 + }
119 + }
120 + }
121 + }
122 + }
123 +
124 + private class LockRequest {
125 +
126 + private final Lock lock;
127 + private final DateTime expirationTime;
128 + private final int leaseDurationMillis;
129 + private final CompletableFuture<Void> future;
130 +
131 + public LockRequest(
132 + Lock lock,
133 + long waitTimeMillis,
134 + int leaseDurationMillis,
135 + CompletableFuture<Void> future) {
136 +
137 + this.lock = lock;
138 + this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
139 + this.leaseDurationMillis = leaseDurationMillis;
140 + this.future = future;
141 + }
142 +
143 + public Lock lock() {
144 + return lock;
145 + }
146 +
147 + public DateTime expirationTime() {
148 + return expirationTime;
149 + }
150 +
151 + public int leaseDurationMillis() {
152 + return leaseDurationMillis;
153 + }
154 +
155 + public CompletableFuture<Void> future() {
156 + return future;
157 + }
158 + }
159 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.service.impl;
2 +
3 +public class TableModificationEvent {
4 +
5 + public enum Type {
6 + ROW_ADDED,
7 + ROW_DELETED,
8 + ROW_UPDATED
9 + }
10 +
11 + private final String tableName;
12 + private final String key;
13 + private final Type type;
14 +
15 + public static TableModificationEvent rowDeleted(String tableName, String key) {
16 + return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
17 + }
18 +
19 + public static TableModificationEvent rowAdded(String tableName, String key) {
20 + return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
21 + }
22 +
23 + public static TableModificationEvent rowUpdated(String tableName, String key) {
24 + return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
25 + }
26 +
27 + private TableModificationEvent(String tableName, String key, Type type) {
28 + this.tableName = tableName;
29 + this.key = key;
30 + this.type = type;
31 + }
32 +
33 + public String tableName() {
34 + return tableName;
35 + }
36 +
37 + public String key() {
38 + return key;
39 + }
40 +
41 + public Type type() {
42 + return type;
43 + }
44 +}