Committed by
Yuta Higuchi
1. Adds a lockAsync method to LockService for async lock acquisition.
2. Fixes a bug where lock() wasn't attempting a tryLock before registering for lock avalilability. (Note 1 above is needed for LeadershipService which will come later) Change-Id: I1deaa445f7cdf86416b335df1d7358e17eff19c3
Showing
3 changed files
with
40 additions
and
26 deletions
1 | package org.onlab.onos.store.service; | 1 | package org.onlab.onos.store.service; |
2 | 2 | ||
3 | +import java.util.concurrent.Future; | ||
4 | + | ||
3 | /** | 5 | /** |
4 | * A lock is a tool for controlling access to a shared resource by multiple processes. | 6 | * A lock is a tool for controlling access to a shared resource by multiple processes. |
5 | * Commonly, a lock provides exclusive access to a resource such as a network device | 7 | * Commonly, a lock provides exclusive access to a resource such as a network device |
... | @@ -36,6 +38,14 @@ public interface Lock { | ... | @@ -36,6 +38,14 @@ public interface Lock { |
36 | void lock(int leaseDurationMillis) throws InterruptedException; | 38 | void lock(int leaseDurationMillis) throws InterruptedException; |
37 | 39 | ||
38 | /** | 40 | /** |
41 | + * Acquires the lock asynchronously. | ||
42 | + * @param leaseDurationMillis leaseDurationMillis the number of milliseconds the lock | ||
43 | + * will be reserved before it becomes available for others. | ||
44 | + * @return Future that can be used for blocking until lock is acquired. | ||
45 | + */ | ||
46 | + Future<Void> lockAsync(int leaseDurationMillis); | ||
47 | + | ||
48 | + /** | ||
39 | * Acquires the lock only if it is free at the time of invocation. | 49 | * Acquires the lock only if it is free at the time of invocation. |
40 | * @param leaseDurationMillis the number of milliseconds the must be | 50 | * @param leaseDurationMillis the number of milliseconds the must be |
41 | * locked after it is granted, before automatically releasing it if it hasn't | 51 | * locked after it is granted, before automatically releasing it if it hasn't |
... | @@ -57,7 +67,7 @@ public interface Lock { | ... | @@ -57,7 +67,7 @@ public interface Lock { |
57 | * elapsed before the lock was acquired | 67 | * elapsed before the lock was acquired |
58 | * @throws InterruptedException if the thread is interrupted while waiting | 68 | * @throws InterruptedException if the thread is interrupted while waiting |
59 | */ | 69 | */ |
60 | - boolean tryLock(long waitTimeMillis, int leaseDurationMillis) throws InterruptedException; | 70 | + boolean tryLock(int waitTimeMillis, int leaseDurationMillis) throws InterruptedException; |
61 | 71 | ||
62 | /** | 72 | /** |
63 | * Returns true if this Lock instance currently holds the lock. | 73 | * Returns true if this Lock instance currently holds the lock. | ... | ... |
... | @@ -6,6 +6,7 @@ import java.nio.charset.StandardCharsets; | ... | @@ -6,6 +6,7 @@ import java.nio.charset.StandardCharsets; |
6 | import java.util.UUID; | 6 | import java.util.UUID; |
7 | import java.util.concurrent.CompletableFuture; | 7 | import java.util.concurrent.CompletableFuture; |
8 | import java.util.concurrent.ExecutionException; | 8 | import java.util.concurrent.ExecutionException; |
9 | +import java.util.concurrent.Future; | ||
9 | import java.util.concurrent.TimeUnit; | 10 | import java.util.concurrent.TimeUnit; |
10 | import java.util.concurrent.TimeoutException; | 11 | import java.util.concurrent.TimeoutException; |
11 | import java.util.concurrent.atomic.AtomicBoolean; | 12 | import java.util.concurrent.atomic.AtomicBoolean; |
... | @@ -53,20 +54,22 @@ public class DistributedLock implements Lock { | ... | @@ -53,20 +54,22 @@ public class DistributedLock implements Lock { |
53 | 54 | ||
54 | @Override | 55 | @Override |
55 | public void lock(int leaseDurationMillis) throws InterruptedException { | 56 | public void lock(int leaseDurationMillis) throws InterruptedException { |
56 | - if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) { | 57 | + try { |
57 | - return; | 58 | + lockAsync(leaseDurationMillis).get(); |
58 | - } else { | 59 | + } catch (ExecutionException e) { |
59 | - CompletableFuture<DateTime> future = | 60 | + throw new DatabaseException(e); |
60 | - lockManager.lockIfAvailable(this, leaseDurationMillis); | ||
61 | - try { | ||
62 | - lockExpirationTime = future.get(); | ||
63 | - } catch (ExecutionException e) { | ||
64 | - throw new DatabaseException(e); | ||
65 | - } | ||
66 | } | 61 | } |
67 | } | 62 | } |
68 | 63 | ||
69 | @Override | 64 | @Override |
65 | + public Future<Void> lockAsync(int leaseDurationMillis) { | ||
66 | + if (isLocked() || tryLock(leaseDurationMillis)) { | ||
67 | + return CompletableFuture.<Void>completedFuture(null); | ||
68 | + } | ||
69 | + return lockManager.lockIfAvailable(this, leaseDurationMillis); | ||
70 | + } | ||
71 | + | ||
72 | + @Override | ||
70 | public boolean tryLock(int leaseDurationMillis) { | 73 | public boolean tryLock(int leaseDurationMillis) { |
71 | if (databaseService.putIfAbsent( | 74 | if (databaseService.putIfAbsent( |
72 | DistributedLockManager.ONOS_LOCK_TABLE_NAME, | 75 | DistributedLockManager.ONOS_LOCK_TABLE_NAME, |
... | @@ -81,15 +84,16 @@ public class DistributedLock implements Lock { | ... | @@ -81,15 +84,16 @@ public class DistributedLock implements Lock { |
81 | 84 | ||
82 | @Override | 85 | @Override |
83 | public boolean tryLock( | 86 | public boolean tryLock( |
84 | - long waitTimeMillis, | 87 | + int waitTimeMillis, |
85 | int leaseDurationMillis) throws InterruptedException { | 88 | int leaseDurationMillis) throws InterruptedException { |
86 | - if (tryLock(leaseDurationMillis)) { | 89 | + if (isLocked() || tryLock(leaseDurationMillis)) { |
87 | return true; | 90 | return true; |
88 | } | 91 | } |
89 | - CompletableFuture<DateTime> future = | 92 | + |
93 | + CompletableFuture<Void> future = | ||
90 | lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis); | 94 | lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis); |
91 | try { | 95 | try { |
92 | - lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS); | 96 | + future.get(waitTimeMillis, TimeUnit.MILLISECONDS); |
93 | return true; | 97 | return true; |
94 | } catch (ExecutionException e) { | 98 | } catch (ExecutionException e) { |
95 | throw new DatabaseException(e); | 99 | throw new DatabaseException(e); | ... | ... |
... | @@ -111,17 +111,17 @@ public class DistributedLockManager implements LockService { | ... | @@ -111,17 +111,17 @@ public class DistributedLockManager implements LockService { |
111 | * @param lock lock to acquire. | 111 | * @param lock lock to acquire. |
112 | * @param waitTimeMillis maximum time to wait before giving up. | 112 | * @param waitTimeMillis maximum time to wait before giving up. |
113 | * @param leaseDurationMillis the duration for which to acquire the lock initially. | 113 | * @param leaseDurationMillis the duration for which to acquire the lock initially. |
114 | - * @return Future lease expiration date. | 114 | + * @return Future that can be blocked on until lock becomes available. |
115 | */ | 115 | */ |
116 | - protected CompletableFuture<DateTime> lockIfAvailable( | 116 | + protected CompletableFuture<Void> lockIfAvailable( |
117 | Lock lock, | 117 | Lock lock, |
118 | - long waitTimeMillis, | 118 | + int waitTimeMillis, |
119 | int leaseDurationMillis) { | 119 | int leaseDurationMillis) { |
120 | - CompletableFuture<DateTime> future = new CompletableFuture<>(); | 120 | + CompletableFuture<Void> future = new CompletableFuture<>(); |
121 | LockRequest request = new LockRequest( | 121 | LockRequest request = new LockRequest( |
122 | lock, | 122 | lock, |
123 | leaseDurationMillis, | 123 | leaseDurationMillis, |
124 | - DateTime.now().plusMillis(leaseDurationMillis), | 124 | + DateTime.now().plusMillis(waitTimeMillis), |
125 | future); | 125 | future); |
126 | locksToAcquire.put(lock.path(), request); | 126 | locksToAcquire.put(lock.path(), request); |
127 | return future; | 127 | return future; |
... | @@ -133,10 +133,10 @@ public class DistributedLockManager implements LockService { | ... | @@ -133,10 +133,10 @@ public class DistributedLockManager implements LockService { |
133 | * @param leaseDurationMillis the duration for which to acquire the lock initially. | 133 | * @param leaseDurationMillis the duration for which to acquire the lock initially. |
134 | * @return Future lease expiration date. | 134 | * @return Future lease expiration date. |
135 | */ | 135 | */ |
136 | - protected CompletableFuture<DateTime> lockIfAvailable( | 136 | + protected CompletableFuture<Void> lockIfAvailable( |
137 | Lock lock, | 137 | Lock lock, |
138 | int leaseDurationMillis) { | 138 | int leaseDurationMillis) { |
139 | - CompletableFuture<DateTime> future = new CompletableFuture<>(); | 139 | + CompletableFuture<Void> future = new CompletableFuture<>(); |
140 | LockRequest request = new LockRequest( | 140 | LockRequest request = new LockRequest( |
141 | lock, | 141 | lock, |
142 | leaseDurationMillis, | 142 | leaseDurationMillis, |
... | @@ -189,7 +189,7 @@ public class DistributedLockManager implements LockService { | ... | @@ -189,7 +189,7 @@ public class DistributedLockManager implements LockService { |
189 | existingRequestIterator.remove(); | 189 | existingRequestIterator.remove(); |
190 | } else { | 190 | } else { |
191 | if (request.lock().tryLock(request.leaseDurationMillis())) { | 191 | if (request.lock().tryLock(request.leaseDurationMillis())) { |
192 | - request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis())); | 192 | + request.future().complete(null); |
193 | existingRequestIterator.remove(); | 193 | existingRequestIterator.remove(); |
194 | } | 194 | } |
195 | } | 195 | } |
... | @@ -203,13 +203,13 @@ public class DistributedLockManager implements LockService { | ... | @@ -203,13 +203,13 @@ public class DistributedLockManager implements LockService { |
203 | private final Lock lock; | 203 | private final Lock lock; |
204 | private final DateTime requestExpirationTime; | 204 | private final DateTime requestExpirationTime; |
205 | private final int leaseDurationMillis; | 205 | private final int leaseDurationMillis; |
206 | - private final CompletableFuture<DateTime> future; | 206 | + private final CompletableFuture<Void> future; |
207 | 207 | ||
208 | public LockRequest( | 208 | public LockRequest( |
209 | Lock lock, | 209 | Lock lock, |
210 | int leaseDurationMillis, | 210 | int leaseDurationMillis, |
211 | DateTime requestExpirationTime, | 211 | DateTime requestExpirationTime, |
212 | - CompletableFuture<DateTime> future) { | 212 | + CompletableFuture<Void> future) { |
213 | 213 | ||
214 | this.lock = lock; | 214 | this.lock = lock; |
215 | this.requestExpirationTime = requestExpirationTime; | 215 | this.requestExpirationTime = requestExpirationTime; |
... | @@ -229,7 +229,7 @@ public class DistributedLockManager implements LockService { | ... | @@ -229,7 +229,7 @@ public class DistributedLockManager implements LockService { |
229 | return leaseDurationMillis; | 229 | return leaseDurationMillis; |
230 | } | 230 | } |
231 | 231 | ||
232 | - public CompletableFuture<DateTime> future() { | 232 | + public CompletableFuture<Void> future() { |
233 | return future; | 233 | return future; |
234 | } | 234 | } |
235 | } | 235 | } | ... | ... |
-
Please register or login to post a comment