Madan Jampani
Committed by Pavlin Radoslavov

1. Lock.lockAsync does not throw DatabaseException.

2. Changed thread pool in LockManager to be non-static.

Change-Id: Ie4e9acd497bacb9d6d812836a930ee79f92cf555
...@@ -58,7 +58,7 @@ public class LeadershipManager implements LeadershipService { ...@@ -58,7 +58,7 @@ public class LeadershipManager implements LeadershipService {
58 private static final int WAIT_BEFORE_RETRY_MS = 2000; 58 private static final int WAIT_BEFORE_RETRY_MS = 2000;
59 59
60 // TODO: Appropriate Thread pool sizing. 60 // TODO: Appropriate Thread pool sizing.
61 - private static final ScheduledExecutorService THREAD_POOL = 61 + private final ScheduledExecutorService threadPool =
62 Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d")); 62 Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
63 63
64 private static final MessageSubject LEADERSHIP_UPDATES = 64 private static final MessageSubject LEADERSHIP_UPDATES =
...@@ -113,7 +113,7 @@ public class LeadershipManager implements LeadershipService { ...@@ -113,7 +113,7 @@ public class LeadershipManager implements LeadershipService {
113 113
114 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES); 114 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
115 115
116 - THREAD_POOL.shutdown(); 116 + threadPool.shutdown();
117 117
118 log.info("Stopped."); 118 log.info("Stopped.");
119 } 119 }
...@@ -180,7 +180,7 @@ public class LeadershipManager implements LeadershipService { ...@@ -180,7 +180,7 @@ public class LeadershipManager implements LeadershipService {
180 verifyNotNull(lock, "Lock should not be null"); 180 verifyNotNull(lock, "Lock should not be null");
181 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> { 181 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
182 if (error == null) { 182 if (error == null) {
183 - THREAD_POOL.schedule( 183 + threadPool.schedule(
184 new ReelectionTask(lock), 184 new ReelectionTask(lock),
185 TERM_DURATION_MS / 2, 185 TERM_DURATION_MS / 2,
186 TimeUnit.MILLISECONDS); 186 TimeUnit.MILLISECONDS);
...@@ -216,7 +216,7 @@ public class LeadershipManager implements LeadershipService { ...@@ -216,7 +216,7 @@ public class LeadershipManager implements LeadershipService {
216 new LeadershipEvent( 216 new LeadershipEvent(
217 LeadershipEvent.Type.LEADER_REELECTED, 217 LeadershipEvent.Type.LEADER_REELECTED,
218 new Leadership(lock.path(), localNode, lock.epoch()))); 218 new Leadership(lock.path(), localNode, lock.epoch())));
219 - THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS); 219 + threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
220 } else { 220 } else {
221 if (openContests.containsKey(lock.path())) { 221 if (openContests.containsKey(lock.path())) {
222 notifyListeners( 222 notifyListeners(
......
...@@ -66,10 +66,16 @@ public class DistributedLock implements Lock { ...@@ -66,10 +66,16 @@ public class DistributedLock implements Lock {
66 66
67 @Override 67 @Override
68 public CompletableFuture<Void> lockAsync(int leaseDurationMillis) { 68 public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
69 - if (isLocked() || tryLock(leaseDurationMillis)) { 69 + try {
70 - return CompletableFuture.<Void>completedFuture(null); 70 + if (isLocked() || tryLock(leaseDurationMillis)) {
71 + return CompletableFuture.<Void>completedFuture(null);
72 + }
73 + return lockManager.lockIfAvailable(this, leaseDurationMillis);
74 + } catch (DatabaseException e) {
75 + CompletableFuture<Void> lockFuture = new CompletableFuture<>();
76 + lockFuture.completeExceptionally(e);
77 + return lockFuture;
71 } 78 }
72 - return lockManager.lockIfAvailable(this, leaseDurationMillis);
73 } 79 }
74 80
75 @Override 81 @Override
......