Madan Jampani
Committed by Gerrit Code Review

LeadershipService and MastershipService/Store APIs return CompletableFutures so …

…that callers can (optionally) chain together dependent operations

Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
Showing 20 changed files with 179 additions and 125 deletions
...@@ -18,6 +18,7 @@ package org.onosproject.cluster; ...@@ -18,6 +18,7 @@ package org.onosproject.cluster;
18 import java.util.List; 18 import java.util.List;
19 import java.util.Map; 19 import java.util.Map;
20 import java.util.Set; 20 import java.util.Set;
21 +import java.util.concurrent.CompletableFuture;
21 22
22 /** 23 /**
23 * Service for leader election. 24 * Service for leader election.
...@@ -55,16 +56,18 @@ public interface LeadershipService { ...@@ -55,16 +56,18 @@ public interface LeadershipService {
55 /** 56 /**
56 * Joins the leadership contest. 57 * Joins the leadership contest.
57 * 58 *
58 - * @param path topic for which this controller node wishes to be a leader. 59 + * @param path topic for which this controller node wishes to be a leader
60 + * @return {@code Leadership} future
59 */ 61 */
60 - void runForLeadership(String path); 62 + CompletableFuture<Leadership> runForLeadership(String path);
61 63
62 /** 64 /**
63 * Withdraws from a leadership contest. 65 * Withdraws from a leadership contest.
64 * 66 *
65 - * @param path topic for which this controller node no longer wishes to be a leader. 67 + * @param path topic for which this controller node no longer wishes to be a leader
68 + * @return future that is successfully completed when withdraw is done
66 */ 69 */
67 - void withdraw(String path); 70 + CompletableFuture<Void> withdraw(String path);
68 71
69 /** 72 /**
70 * If the local nodeId is the leader for specified topic, this method causes it to 73 * If the local nodeId is the leader for specified topic, this method causes it to
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
15 */ 15 */
16 package org.onosproject.mastership; 16 package org.onosproject.mastership;
17 17
18 +import java.util.concurrent.CompletableFuture;
19 +
18 import org.onosproject.cluster.NodeId; 20 import org.onosproject.cluster.NodeId;
19 import org.onosproject.net.DeviceId; 21 import org.onosproject.net.DeviceId;
20 import org.onosproject.net.MastershipRole; 22 import org.onosproject.net.MastershipRole;
...@@ -30,8 +32,9 @@ public interface MastershipAdminService { ...@@ -30,8 +32,9 @@ public interface MastershipAdminService {
30 * @param instance controller instance identifier 32 * @param instance controller instance identifier
31 * @param deviceId device identifier 33 * @param deviceId device identifier
32 * @param role requested role 34 * @param role requested role
35 + * @return future that is completed when the role is set
33 */ 36 */
34 - void setRole(NodeId instance, DeviceId deviceId, MastershipRole role); 37 + CompletableFuture<Void> setRole(NodeId instance, DeviceId deviceId, MastershipRole role);
35 38
36 /** 39 /**
37 * Balances the mastership to be shared as evenly as possibly by all 40 * Balances the mastership to be shared as evenly as possibly by all
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.mastership; 16 package org.onosproject.mastership;
17 17
18 import java.util.Set; 18 import java.util.Set;
19 +import java.util.concurrent.CompletableFuture;
19 20
20 import org.onosproject.cluster.NodeId; 21 import org.onosproject.cluster.NodeId;
21 import org.onosproject.cluster.RoleInfo; 22 import org.onosproject.cluster.RoleInfo;
...@@ -46,7 +47,7 @@ public interface MastershipService { ...@@ -46,7 +47,7 @@ public interface MastershipService {
46 * @param deviceId the the identifier of the device 47 * @param deviceId the the identifier of the device
47 * @return the role of this controller instance 48 * @return the role of this controller instance
48 */ 49 */
49 - MastershipRole requestRoleFor(DeviceId deviceId); 50 + CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId);
50 51
51 /** 52 /**
52 * Abandons mastership of the specified device on the local node thus 53 * Abandons mastership of the specified device on the local node thus
......
...@@ -38,7 +38,7 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD ...@@ -38,7 +38,7 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
38 * @param deviceId device identifier 38 * @param deviceId device identifier
39 * @return established or newly negotiated mastership role 39 * @return established or newly negotiated mastership role
40 */ 40 */
41 - MastershipRole requestRole(DeviceId deviceId); 41 + CompletableFuture<MastershipRole> requestRole(DeviceId deviceId);
42 42
43 /** 43 /**
44 * Returns the role of a device for a specific controller instance. 44 * Returns the role of a device for a specific controller instance.
......
...@@ -18,6 +18,7 @@ package org.onosproject.cluster; ...@@ -18,6 +18,7 @@ package org.onosproject.cluster;
18 import java.util.List; 18 import java.util.List;
19 import java.util.Map; 19 import java.util.Map;
20 import java.util.Set; 20 import java.util.Set;
21 +import java.util.concurrent.CompletableFuture;
21 22
22 /** 23 /**
23 * Test adapter for leadership service. 24 * Test adapter for leadership service.
...@@ -40,13 +41,13 @@ public class LeadershipServiceAdapter implements LeadershipService { ...@@ -40,13 +41,13 @@ public class LeadershipServiceAdapter implements LeadershipService {
40 } 41 }
41 42
42 @Override 43 @Override
43 - public void runForLeadership(String path) { 44 + public CompletableFuture<Leadership> runForLeadership(String path) {
44 - 45 + return null;
45 } 46 }
46 47
47 @Override 48 @Override
48 - public void withdraw(String path) { 49 + public CompletableFuture<Void> withdraw(String path) {
49 - 50 + return null;
50 } 51 }
51 52
52 @Override 53 @Override
......
...@@ -21,6 +21,7 @@ import org.onosproject.net.DeviceId; ...@@ -21,6 +21,7 @@ import org.onosproject.net.DeviceId;
21 import org.onosproject.net.MastershipRole; 21 import org.onosproject.net.MastershipRole;
22 22
23 import java.util.Set; 23 import java.util.Set;
24 +import java.util.concurrent.CompletableFuture;
24 25
25 /** 26 /**
26 * Test adapter for mastership service. 27 * Test adapter for mastership service.
...@@ -32,7 +33,7 @@ public class MastershipServiceAdapter implements MastershipService { ...@@ -32,7 +33,7 @@ public class MastershipServiceAdapter implements MastershipService {
32 } 33 }
33 34
34 @Override 35 @Override
35 - public MastershipRole requestRoleFor(DeviceId deviceId) { 36 + public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
36 return null; 37 return null;
37 } 38 }
38 39
......
...@@ -17,6 +17,8 @@ package org.onosproject.cluster.impl; ...@@ -17,6 +17,8 @@ package org.onosproject.cluster.impl;
17 17
18 import com.codahale.metrics.Timer; 18 import com.codahale.metrics.Timer;
19 import com.codahale.metrics.Timer.Context; 19 import com.codahale.metrics.Timer.Context;
20 +import com.google.common.collect.Lists;
21 +import com.google.common.util.concurrent.Futures;
20 22
21 import org.apache.felix.scr.annotations.Activate; 23 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 24 import org.apache.felix.scr.annotations.Component;
...@@ -109,7 +111,7 @@ public class MastershipManager ...@@ -109,7 +111,7 @@ public class MastershipManager
109 } 111 }
110 112
111 @Override 113 @Override
112 - public void setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) { 114 + public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
113 checkNotNull(nodeId, NODE_ID_NULL); 115 checkNotNull(nodeId, NODE_ID_NULL);
114 checkNotNull(deviceId, DEVICE_ID_NULL); 116 checkNotNull(deviceId, DEVICE_ID_NULL);
115 checkNotNull(role, ROLE_NULL); 117 checkNotNull(role, ROLE_NULL);
...@@ -128,14 +130,14 @@ public class MastershipManager ...@@ -128,14 +130,14 @@ public class MastershipManager
128 break; 130 break;
129 default: 131 default:
130 log.info("Unknown role; ignoring"); 132 log.info("Unknown role; ignoring");
131 - return; 133 + return CompletableFuture.completedFuture(null);
132 } 134 }
133 135
134 - eventFuture.whenComplete((event, error) -> { 136 + return eventFuture.whenComplete((event, error) -> {
135 if (event != null) { 137 if (event != null) {
136 post(event); 138 post(event);
137 } 139 }
138 - }); 140 + }).thenApply(v -> null);
139 } 141 }
140 142
141 @Override 143 @Override
...@@ -155,14 +157,11 @@ public class MastershipManager ...@@ -155,14 +157,11 @@ public class MastershipManager
155 } 157 }
156 158
157 @Override 159 @Override
158 - public MastershipRole requestRoleFor(DeviceId deviceId) { 160 + public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
159 checkNotNull(deviceId, DEVICE_ID_NULL); 161 checkNotNull(deviceId, DEVICE_ID_NULL);
160 final Context timer = startTimer(requestRoleTimer); 162 final Context timer = startTimer(requestRoleTimer);
161 - try { 163 + return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
162 - return store.requestRole(deviceId); 164 +
163 - } finally {
164 - stopTimer(timer);
165 - }
166 } 165 }
167 166
168 @Override 167 @Override
...@@ -222,13 +221,18 @@ public class MastershipManager ...@@ -222,13 +221,18 @@ public class MastershipManager
222 } 221 }
223 222
224 // Now re-balance the buckets until they are roughly even. 223 // Now re-balance the buckets until they are roughly even.
224 + List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
225 int rounds = controllerDevices.keySet().size(); 225 int rounds = controllerDevices.keySet().size();
226 for (int i = 0; i < rounds; i++) { 226 for (int i = 0; i < rounds; i++) {
227 // Iterate over the buckets and find the smallest and the largest. 227 // Iterate over the buckets and find the smallest and the largest.
228 ControllerNode smallest = findBucket(true, controllerDevices); 228 ControllerNode smallest = findBucket(true, controllerDevices);
229 ControllerNode largest = findBucket(false, controllerDevices); 229 ControllerNode largest = findBucket(false, controllerDevices);
230 - balanceBuckets(smallest, largest, controllerDevices, deviceCount); 230 + balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
231 } 231 }
232 + CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
233 + balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
234 +
235 + Futures.getUnchecked(balanceRolesFuture);
232 } 236 }
233 237
234 private ControllerNode findBucket(boolean min, 238 private ControllerNode findBucket(boolean min,
...@@ -245,7 +249,7 @@ public class MastershipManager ...@@ -245,7 +249,7 @@ public class MastershipManager
245 return xNode; 249 return xNode;
246 } 250 }
247 251
248 - private void balanceBuckets(ControllerNode smallest, ControllerNode largest, 252 + private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
249 Map<ControllerNode, Set<DeviceId>> controllerDevices, 253 Map<ControllerNode, Set<DeviceId>> controllerDevices,
250 int deviceCount) { 254 int deviceCount) {
251 Collection<DeviceId> minBucket = controllerDevices.get(smallest); 255 Collection<DeviceId> minBucket = controllerDevices.get(smallest);
...@@ -255,6 +259,8 @@ public class MastershipManager ...@@ -255,6 +259,8 @@ public class MastershipManager
255 int delta = (maxBucket.size() - minBucket.size()) / 2; 259 int delta = (maxBucket.size() - minBucket.size()) / 2;
256 delta = Math.min(deviceCount / bucketCount, delta); 260 delta = Math.min(deviceCount / bucketCount, delta);
257 261
262 + List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
263 +
258 if (delta > 0) { 264 if (delta > 0) {
259 log.info("Attempting to move {} nodes from {} to {}...", delta, 265 log.info("Attempting to move {} nodes from {} to {}...", delta,
260 largest.id(), smallest.id()); 266 largest.id(), smallest.id());
...@@ -264,12 +270,14 @@ public class MastershipManager ...@@ -264,12 +270,14 @@ public class MastershipManager
264 while (it.hasNext() && i < delta) { 270 while (it.hasNext() && i < delta) {
265 DeviceId deviceId = it.next(); 271 DeviceId deviceId = it.next();
266 log.info("Setting {} as the master for {}", smallest.id(), deviceId); 272 log.info("Setting {} as the master for {}", smallest.id(), deviceId);
267 - setRole(smallest.id(), deviceId, MASTER); 273 + setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
268 controllerDevices.get(smallest).add(deviceId); 274 controllerDevices.get(smallest).add(deviceId);
269 it.remove(); 275 it.remove();
270 i++; 276 i++;
271 } 277 }
272 } 278 }
279 +
280 + return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
273 } 281 }
274 282
275 283
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.net.device.impl; 16 package org.onosproject.net.device.impl;
17 17
18 import com.google.common.collect.Lists; 18 import com.google.common.collect.Lists;
19 +
19 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -57,6 +58,7 @@ import org.slf4j.Logger; ...@@ -57,6 +58,7 @@ import org.slf4j.Logger;
57 58
58 import java.util.Collection; 59 import java.util.Collection;
59 import java.util.List; 60 import java.util.List;
61 +import java.util.concurrent.CompletableFuture;
60 import java.util.concurrent.ScheduledExecutorService; 62 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.TimeUnit; 63 import java.util.concurrent.TimeUnit;
62 64
...@@ -79,7 +81,6 @@ public class DeviceManager ...@@ -79,7 +81,6 @@ public class DeviceManager
79 private static final String PORT_NUMBER_NULL = "Port number cannot be null"; 81 private static final String PORT_NUMBER_NULL = "Port number cannot be null";
80 private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null"; 82 private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
81 private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null"; 83 private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
82 - private static final String ROLE_NULL = "Role cannot be null";
83 84
84 private final Logger log = getLogger(getClass()); 85 private final Logger log = getLogger(getClass());
85 86
...@@ -89,6 +90,7 @@ public class DeviceManager ...@@ -89,6 +90,7 @@ public class DeviceManager
89 private final DeviceStoreDelegate delegate = new InternalStoreDelegate(); 90 private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
90 91
91 private final MastershipListener mastershipListener = new InternalMastershipListener(); 92 private final MastershipListener mastershipListener = new InternalMastershipListener();
93 + private NodeId localNodeId;
92 94
93 private ScheduledExecutorService backgroundService; 95 private ScheduledExecutorService backgroundService;
94 96
...@@ -113,6 +115,7 @@ public class DeviceManager ...@@ -113,6 +115,7 @@ public class DeviceManager
113 @Activate 115 @Activate
114 public void activate() { 116 public void activate() {
115 backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background")); 117 backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
118 + localNodeId = clusterService.getLocalNode().id();
116 119
117 store.setDelegate(delegate); 120 store.setDelegate(delegate);
118 eventDispatcher.addSink(DeviceEvent.class, listenerRegistry); 121 eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
...@@ -302,12 +305,10 @@ public class DeviceManager ...@@ -302,12 +305,10 @@ public class DeviceManager
302 checkValidity(); 305 checkValidity();
303 306
304 log.info("Device {} connected", deviceId); 307 log.info("Device {} connected", deviceId);
305 - final NodeId myNodeId = clusterService.getLocalNode().id();
306 -
307 // check my Role 308 // check my Role
308 mastershipService.requestRoleFor(deviceId); 309 mastershipService.requestRoleFor(deviceId);
309 final MastershipTerm term = termService.getMastershipTerm(deviceId); 310 final MastershipTerm term = termService.getMastershipTerm(deviceId);
310 - if (term == null || !myNodeId.equals(term.master())) { 311 + if (term == null || !localNodeId.equals(term.master())) {
311 log.info("Role of this node is STANDBY for {}", deviceId); 312 log.info("Role of this node is STANDBY for {}", deviceId);
312 // TODO: Do we need to explicitly tell the Provider that 313 // TODO: Do we need to explicitly tell the Provider that
313 // this instance is not the MASTER 314 // this instance is not the MASTER
...@@ -337,7 +338,6 @@ public class DeviceManager ...@@ -337,7 +338,6 @@ public class DeviceManager
337 338
338 log.info("Device {} disconnected from this node", deviceId); 339 log.info("Device {} disconnected from this node", deviceId);
339 340
340 - DeviceEvent event = null;
341 List<Port> ports = store.getPorts(deviceId); 341 List<Port> ports = store.getPorts(deviceId);
342 List<PortDescription> descs = Lists.newArrayList(); 342 List<PortDescription> descs = Lists.newArrayList();
343 ports.forEach(port -> 343 ports.forEach(port ->
...@@ -346,7 +346,7 @@ public class DeviceManager ...@@ -346,7 +346,7 @@ public class DeviceManager
346 port.portSpeed()))); 346 port.portSpeed())));
347 store.updatePorts(this.provider().id(), deviceId, descs); 347 store.updatePorts(this.provider().id(), deviceId, descs);
348 try { 348 try {
349 - event = store.markOffline(deviceId); 349 + post(store.markOffline(deviceId));
350 } catch (IllegalStateException e) { 350 } catch (IllegalStateException e) {
351 log.warn("Failed to mark {} offline", deviceId); 351 log.warn("Failed to mark {} offline", deviceId);
352 // only the MASTER should be marking off-line in normal cases, 352 // only the MASTER should be marking off-line in normal cases,
...@@ -360,26 +360,21 @@ public class DeviceManager ...@@ -360,26 +360,21 @@ public class DeviceManager
360 360
361 // FIXME: Store semantics leaking out as IllegalStateException. 361 // FIXME: Store semantics leaking out as IllegalStateException.
362 // Consider revising store API to handle this scenario. 362 // Consider revising store API to handle this scenario.
363 - 363 + CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
364 - MastershipRole role = mastershipService.requestRoleFor(deviceId); 364 + roleFuture.whenComplete((role, error) -> {
365 MastershipTerm term = termService.getMastershipTerm(deviceId); 365 MastershipTerm term = termService.getMastershipTerm(deviceId);
366 - final NodeId myNodeId = clusterService.getLocalNode().id();
367 // TODO: Move this type of check inside device clock manager, etc. 366 // TODO: Move this type of check inside device clock manager, etc.
368 - if (term != null && myNodeId.equals(term.master())) { 367 + if (term != null && localNodeId.equals(term.master())) {
369 log.info("Retry marking {} offline", deviceId); 368 log.info("Retry marking {} offline", deviceId);
370 deviceClockProviderService.setMastershipTerm(deviceId, term); 369 deviceClockProviderService.setMastershipTerm(deviceId, term);
371 - event = store.markOffline(deviceId); 370 + post(store.markOffline(deviceId));
372 } else { 371 } else {
373 log.info("Failed again marking {} offline. {}", deviceId, role); 372 log.info("Failed again marking {} offline. {}", deviceId, role);
374 } 373 }
374 + });
375 } finally { 375 } finally {
376 //relinquish master role and ability to be backup. 376 //relinquish master role and ability to be backup.
377 mastershipService.relinquishMastership(deviceId); 377 mastershipService.relinquishMastership(deviceId);
378 -
379 - if (event != null) {
380 - log.info("Device {} disconnected from cluster", deviceId);
381 - post(event);
382 - }
383 } 378 }
384 } 379 }
385 380
...@@ -531,12 +526,11 @@ public class DeviceManager ...@@ -531,12 +526,11 @@ public class DeviceManager
531 private void reassertRole(final DeviceId did, 526 private void reassertRole(final DeviceId did,
532 final MastershipRole nextRole) { 527 final MastershipRole nextRole) {
533 528
534 - final NodeId myNodeId = clusterService.getLocalNode().id();
535 MastershipRole myNextRole = nextRole; 529 MastershipRole myNextRole = nextRole;
536 if (myNextRole == NONE) { 530 if (myNextRole == NONE) {
537 mastershipService.requestRoleFor(did); 531 mastershipService.requestRoleFor(did);
538 MastershipTerm term = termService.getMastershipTerm(did); 532 MastershipTerm term = termService.getMastershipTerm(did);
539 - if (term != null && myNodeId.equals(term.master())) { 533 + if (term != null && localNodeId.equals(term.master())) {
540 myNextRole = MASTER; 534 myNextRole = MASTER;
541 } else { 535 } else {
542 myNextRole = STANDBY; 536 myNextRole = STANDBY;
...@@ -597,21 +591,20 @@ public class DeviceManager ...@@ -597,21 +591,20 @@ public class DeviceManager
597 } 591 }
598 592
599 final DeviceId did = event.subject(); 593 final DeviceId did = event.subject();
600 - final NodeId myNodeId = clusterService.getLocalNode().id();
601 594
602 // myRole suggested by MastershipService 595 // myRole suggested by MastershipService
603 MastershipRole myNextRole; 596 MastershipRole myNextRole;
604 - if (myNodeId.equals(event.roleInfo().master())) { 597 + if (localNodeId.equals(event.roleInfo().master())) {
605 // confirm latest info 598 // confirm latest info
606 MastershipTerm term = termService.getMastershipTerm(did); 599 MastershipTerm term = termService.getMastershipTerm(did);
607 - final boolean iHaveControl = term != null && myNodeId.equals(term.master()); 600 + final boolean iHaveControl = term != null && localNodeId.equals(term.master());
608 if (iHaveControl) { 601 if (iHaveControl) {
609 deviceClockProviderService.setMastershipTerm(did, term); 602 deviceClockProviderService.setMastershipTerm(did, term);
610 myNextRole = MASTER; 603 myNextRole = MASTER;
611 } else { 604 } else {
612 myNextRole = STANDBY; 605 myNextRole = STANDBY;
613 } 606 }
614 - } else if (event.roleInfo().backups().contains(myNodeId)) { 607 + } else if (event.roleInfo().backups().contains(localNodeId)) {
615 myNextRole = STANDBY; 608 myNextRole = STANDBY;
616 } else { 609 } else {
617 myNextRole = NONE; 610 myNextRole = NONE;
......
...@@ -34,6 +34,7 @@ import org.onosproject.net.DeviceId; ...@@ -34,6 +34,7 @@ import org.onosproject.net.DeviceId;
34 import org.onosproject.store.trivial.impl.SimpleMastershipStore; 34 import org.onosproject.store.trivial.impl.SimpleMastershipStore;
35 35
36 import com.google.common.collect.Sets; 36 import com.google.common.collect.Sets;
37 +import com.google.common.util.concurrent.Futures;
37 38
38 import static org.junit.Assert.assertEquals; 39 import static org.junit.Assert.assertEquals;
39 import static org.junit.Assert.assertNull; 40 import static org.junit.Assert.assertNull;
...@@ -77,7 +78,7 @@ public class MastershipManagerTest { ...@@ -77,7 +78,7 @@ public class MastershipManagerTest {
77 public void setRole() { 78 public void setRole() {
78 mgr.setRole(NID_OTHER, DEV_MASTER, MASTER); 79 mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
79 assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER)); 80 assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
80 - assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER)); 81 + assertEquals("wrong obtained role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
81 82
82 //set to master 83 //set to master
83 mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER); 84 mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
...@@ -112,8 +113,8 @@ public class MastershipManagerTest { ...@@ -112,8 +113,8 @@ public class MastershipManagerTest {
112 mgr.setRole(NID_OTHER, DEV_OTHER, MASTER); 113 mgr.setRole(NID_OTHER, DEV_OTHER, MASTER);
113 114
114 //local should be master for one but standby for other 115 //local should be master for one but standby for other
115 - assertEquals("wrong role:", MASTER, mgr.requestRoleFor(DEV_MASTER)); 116 + assertEquals("wrong role:", MASTER, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
116 - assertEquals("wrong role:", STANDBY, mgr.requestRoleFor(DEV_OTHER)); 117 + assertEquals("wrong role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_OTHER)));
117 } 118 }
118 119
119 @Test 120 @Test
......
...@@ -19,6 +19,7 @@ import java.util.ArrayList; ...@@ -19,6 +19,7 @@ import java.util.ArrayList;
19 import java.util.Iterator; 19 import java.util.Iterator;
20 import java.util.List; 20 import java.util.List;
21 import java.util.Set; 21 import java.util.Set;
22 +import java.util.concurrent.CompletableFuture;
22 23
23 import org.junit.After; 24 import org.junit.After;
24 import org.junit.Before; 25 import org.junit.Before;
...@@ -305,8 +306,8 @@ public class DeviceManagerTest { ...@@ -305,8 +306,8 @@ public class DeviceManagerTest {
305 } 306 }
306 307
307 @Override 308 @Override
308 - public MastershipRole requestRoleFor(DeviceId deviceId) { 309 + public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
309 - return MastershipRole.MASTER; 310 + return CompletableFuture.completedFuture(MastershipRole.MASTER);
310 } 311 }
311 312
312 @Override 313 @Override
......
...@@ -49,6 +49,7 @@ import java.util.HashMap; ...@@ -49,6 +49,7 @@ import java.util.HashMap;
49 import java.util.List; 49 import java.util.List;
50 import java.util.Map; 50 import java.util.Map;
51 import java.util.Set; 51 import java.util.Set;
52 +import java.util.concurrent.CompletableFuture;
52 import java.util.concurrent.ExecutorService; 53 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors; 54 import java.util.concurrent.Executors;
54 import java.util.concurrent.Future; 55 import java.util.concurrent.Future;
...@@ -188,7 +189,7 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -188,7 +189,7 @@ public class HazelcastLeadershipService implements LeadershipService {
188 } 189 }
189 190
190 @Override 191 @Override
191 - public void runForLeadership(String path) { 192 + public CompletableFuture<Leadership> runForLeadership(String path) {
192 checkArgument(path != null); 193 checkArgument(path != null);
193 Topic topic = new Topic(path); 194 Topic topic = new Topic(path);
194 Topic oldTopic = topics.putIfAbsent(path, topic); 195 Topic oldTopic = topics.putIfAbsent(path, topic);
...@@ -198,16 +199,18 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -198,16 +199,18 @@ public class HazelcastLeadershipService implements LeadershipService {
198 } else { 199 } else {
199 oldTopic.runForLeadership(); 200 oldTopic.runForLeadership();
200 } 201 }
202 + return CompletableFuture.completedFuture(getLeadership(path));
201 } 203 }
202 204
203 @Override 205 @Override
204 - public void withdraw(String path) { 206 + public CompletableFuture<Void> withdraw(String path) {
205 checkArgument(path != null); 207 checkArgument(path != null);
206 Topic topic = topics.get(path); 208 Topic topic = topics.get(path);
207 if (topic != null) { 209 if (topic != null) {
208 topics.remove(path, topic); 210 topics.remove(path, topic);
209 topic.stop(); 211 topic.stop();
210 } 212 }
213 + return CompletableFuture.completedFuture(null);
211 } 214 }
212 215
213 @Override 216 @Override
......
...@@ -37,6 +37,8 @@ import java.util.Map.Entry; ...@@ -37,6 +37,8 @@ import java.util.Map.Entry;
37 import java.util.Objects; 37 import java.util.Objects;
38 import java.util.Set; 38 import java.util.Set;
39 import java.util.List; 39 import java.util.List;
40 +import java.util.concurrent.CancellationException;
41 +import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors; 43 import java.util.concurrent.Executors;
42 import java.util.concurrent.ScheduledExecutorService; 44 import java.util.concurrent.ScheduledExecutorService;
...@@ -199,8 +201,14 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -199,8 +201,14 @@ public class DistributedLeadershipManager implements LeadershipService {
199 } 201 }
200 202
201 @Override 203 @Override
202 - public void runForLeadership(String path) { 204 + public CompletableFuture<Leadership> runForLeadership(String path) {
203 log.debug("Running for leadership for topic: {}", path); 205 log.debug("Running for leadership for topic: {}", path);
206 + CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
207 + doRunForLeadership(path, resultFuture);
208 + return resultFuture;
209 + }
210 +
211 + private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
204 try { 212 try {
205 Versioned<List<NodeId>> candidates = candidateMap.get(path); 213 Versioned<List<NodeId>> candidates = candidateMap.get(path);
206 if (candidates != null) { 214 if (candidates != null) {
...@@ -216,7 +224,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -216,7 +224,7 @@ public class DistributedLeadershipManager implements LeadershipService {
216 newCandidates.version(), 224 newCandidates.version(),
217 newCandidates.creationTime()))); 225 newCandidates.creationTime())));
218 } else { 226 } else {
219 - rerunForLeadership(path); 227 + rerunForLeadership(path, future);
220 return; 228 return;
221 } 229 }
222 } 230 }
...@@ -231,28 +239,38 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -231,28 +239,38 @@ public class DistributedLeadershipManager implements LeadershipService {
231 newCandidates.version(), 239 newCandidates.version(),
232 newCandidates.creationTime()))); 240 newCandidates.creationTime())));
233 } else { 241 } else {
234 - rerunForLeadership(path); 242 + rerunForLeadership(path, future);
235 return; 243 return;
236 } 244 }
237 } 245 }
238 log.debug("In the leadership race for topic {} with candidates {}", path, candidates); 246 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
239 activeTopics.add(path); 247 activeTopics.add(path);
240 - tryLeaderLock(path); 248 + tryLeaderLock(path, future);
241 } catch (ConsistentMapException e) { 249 } catch (ConsistentMapException e) {
242 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e); 250 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
243 - rerunForLeadership(path); 251 + rerunForLeadership(path, future);
244 } 252 }
245 } 253 }
246 254
247 @Override 255 @Override
248 - public void withdraw(String path) { 256 + public CompletableFuture<Void> withdraw(String path) {
249 activeTopics.remove(path); 257 activeTopics.remove(path);
258 + CompletableFuture<Void> resultFuture = new CompletableFuture<>();
259 + doWithdraw(path, resultFuture);
260 + return resultFuture;
261 + }
262 +
250 263
264 + private void doWithdraw(String path, CompletableFuture<Void> future) {
265 + if (activeTopics.contains(path)) {
266 + future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
267 + }
251 try { 268 try {
252 Versioned<NodeId> leader = leaderMap.get(path); 269 Versioned<NodeId> leader = leaderMap.get(path);
253 if (leader != null && Objects.equals(leader.value(), localNodeId)) { 270 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
254 if (leaderMap.remove(path, leader.version())) { 271 if (leaderMap.remove(path, leader.version())) {
255 log.info("Gave up leadership for {}", path); 272 log.info("Gave up leadership for {}", path);
273 + future.complete(null);
256 publish(new LeadershipEvent( 274 publish(new LeadershipEvent(
257 LeadershipEvent.Type.LEADER_BOOTED, 275 LeadershipEvent.Type.LEADER_BOOTED,
258 new Leadership(path, 276 new Leadership(path,
...@@ -267,10 +285,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -267,10 +285,12 @@ public class DistributedLeadershipManager implements LeadershipService {
267 ? Lists.newArrayList(candidates.value()) 285 ? Lists.newArrayList(candidates.value())
268 : Lists.newArrayList(); 286 : Lists.newArrayList();
269 if (!candidateList.remove(localNodeId)) { 287 if (!candidateList.remove(localNodeId)) {
288 + future.complete(null);
270 return; 289 return;
271 } 290 }
272 if (candidateMap.replace(path, candidates.version(), candidateList)) { 291 if (candidateMap.replace(path, candidates.version(), candidateList)) {
273 Versioned<List<NodeId>> newCandidates = candidateMap.get(path); 292 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
293 + future.complete(null);
274 publish(new LeadershipEvent( 294 publish(new LeadershipEvent(
275 LeadershipEvent.Type.CANDIDATES_CHANGED, 295 LeadershipEvent.Type.CANDIDATES_CHANGED,
276 new Leadership(path, 296 new Leadership(path,
...@@ -279,11 +299,11 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -279,11 +299,11 @@ public class DistributedLeadershipManager implements LeadershipService {
279 newCandidates.creationTime()))); 299 newCandidates.creationTime())));
280 } else { 300 } else {
281 log.warn("Failed to withdraw from candidates list. Will retry"); 301 log.warn("Failed to withdraw from candidates list. Will retry");
282 - retryWithdraw(path); 302 + retryWithdraw(path, future);
283 } 303 }
284 } catch (Exception e) { 304 } catch (Exception e) {
285 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e); 305 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
286 - retryWithdraw(path); 306 + retryWithdraw(path, future);
287 } 307 }
288 } 308 }
289 309
...@@ -304,7 +324,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -304,7 +324,7 @@ public class DistributedLeadershipManager implements LeadershipService {
304 localNodeId, 324 localNodeId,
305 leader.version(), 325 leader.version(),
306 leader.creationTime()))); 326 leader.creationTime())));
307 - retryLock(path); 327 + retryLock(path, new CompletableFuture<>());
308 return true; 328 return true;
309 } 329 }
310 } 330 }
...@@ -350,7 +370,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -350,7 +370,7 @@ public class DistributedLeadershipManager implements LeadershipService {
350 return updated; 370 return updated;
351 } 371 }
352 372
353 - private void tryLeaderLock(String path) { 373 + private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
354 if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) { 374 if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
355 return; 375 return;
356 } 376 }
...@@ -362,35 +382,37 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -362,35 +382,37 @@ public class DistributedLeadershipManager implements LeadershipService {
362 .filter(n -> clusterService.getState(n) == ACTIVE) 382 .filter(n -> clusterService.getState(n) == ACTIVE)
363 .collect(Collectors.toList()); 383 .collect(Collectors.toList());
364 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) { 384 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
365 - leaderLockAttempt(path, candidates.value()); 385 + leaderLockAttempt(path, candidates.value(), future);
366 } else { 386 } else {
367 - retryLock(path); 387 + retryLock(path, future);
368 } 388 }
369 } else { 389 } else {
370 throw new IllegalStateException("should not be here"); 390 throw new IllegalStateException("should not be here");
371 } 391 }
372 } catch (Exception e) { 392 } catch (Exception e) {
373 log.debug("Failed to fetch candidate information for {}", path, e); 393 log.debug("Failed to fetch candidate information for {}", path, e);
374 - retryLock(path); 394 + retryLock(path, future);
375 } 395 }
376 } 396 }
377 397
378 - private void leaderLockAttempt(String path, List<NodeId> candidates) { 398 + private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
379 try { 399 try {
380 Versioned<NodeId> currentLeader = leaderMap.get(path); 400 Versioned<NodeId> currentLeader = leaderMap.get(path);
381 if (currentLeader != null) { 401 if (currentLeader != null) {
382 if (localNodeId.equals(currentLeader.value())) { 402 if (localNodeId.equals(currentLeader.value())) {
383 log.info("Already has leadership for {}", path); 403 log.info("Already has leadership for {}", path);
384 // FIXME: candidates can get out of sync. 404 // FIXME: candidates can get out of sync.
385 - publish(new LeadershipEvent( 405 + Leadership leadership = new Leadership(path,
386 - LeadershipEvent.Type.LEADER_ELECTED,
387 - new Leadership(path,
388 localNodeId, 406 localNodeId,
389 currentLeader.version(), 407 currentLeader.version(),
390 - currentLeader.creationTime()))); 408 + currentLeader.creationTime());
409 + future.complete(leadership);
410 + publish(new LeadershipEvent(
411 + LeadershipEvent.Type.LEADER_ELECTED,
412 + leadership));
391 } else { 413 } else {
392 // someone else has leadership. will retry after sometime. 414 // someone else has leadership. will retry after sometime.
393 - retryLock(path); 415 + retryLock(path, future);
394 } 416 }
395 } else { 417 } else {
396 if (leaderMap.putIfAbsent(path, localNodeId) == null) { 418 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
...@@ -398,20 +420,22 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -398,20 +420,22 @@ public class DistributedLeadershipManager implements LeadershipService {
398 // do a get again to get the version (epoch) 420 // do a get again to get the version (epoch)
399 Versioned<NodeId> newLeader = leaderMap.get(path); 421 Versioned<NodeId> newLeader = leaderMap.get(path);
400 // FIXME: candidates can get out of sync 422 // FIXME: candidates can get out of sync
401 - publish(new LeadershipEvent( 423 + Leadership leadership = new Leadership(path,
402 - LeadershipEvent.Type.LEADER_ELECTED,
403 - new Leadership(path,
404 newLeader.value(), 424 newLeader.value(),
405 newLeader.version(), 425 newLeader.version(),
406 - newLeader.creationTime()))); 426 + newLeader.creationTime());
427 + future.complete(leadership);
428 + publish(new LeadershipEvent(
429 + LeadershipEvent.Type.LEADER_ELECTED,
430 + leadership));
407 } else { 431 } else {
408 // someone beat us to it. 432 // someone beat us to it.
409 - retryLock(path); 433 + retryLock(path, future);
410 } 434 }
411 } 435 }
412 } catch (Exception e) { 436 } catch (Exception e) {
413 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e); 437 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
414 - retryLock(path); 438 + retryLock(path, future);
415 } 439 }
416 } 440 }
417 441
...@@ -463,23 +487,23 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -463,23 +487,23 @@ public class DistributedLeadershipManager implements LeadershipService {
463 } 487 }
464 } 488 }
465 489
466 - private void rerunForLeadership(String path) { 490 + private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
467 retryLeaderLockExecutor.schedule( 491 retryLeaderLockExecutor.schedule(
468 - () -> runForLeadership(path), 492 + () -> doRunForLeadership(path, future),
469 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC, 493 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
470 TimeUnit.SECONDS); 494 TimeUnit.SECONDS);
471 } 495 }
472 496
473 - private void retryLock(String path) { 497 + private void retryLock(String path, CompletableFuture<Leadership> future) {
474 retryLeaderLockExecutor.schedule( 498 retryLeaderLockExecutor.schedule(
475 - () -> tryLeaderLock(path), 499 + () -> tryLeaderLock(path, future),
476 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, 500 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
477 TimeUnit.SECONDS); 501 TimeUnit.SECONDS);
478 } 502 }
479 503
480 - private void retryWithdraw(String path) { 504 + private void retryWithdraw(String path, CompletableFuture<Void> future) {
481 retryLeaderLockExecutor.schedule( 505 retryLeaderLockExecutor.schedule(
482 - () -> withdraw(path), 506 + () -> doWithdraw(path, future),
483 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, 507 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
484 TimeUnit.SECONDS); 508 TimeUnit.SECONDS);
485 } 509 }
......
...@@ -163,19 +163,22 @@ public class ConsistentDeviceMastershipStore ...@@ -163,19 +163,22 @@ public class ConsistentDeviceMastershipStore
163 } 163 }
164 164
165 @Override 165 @Override
166 - public MastershipRole requestRole(DeviceId deviceId) { 166 + public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
167 checkArgument(deviceId != null, DEVICE_ID_NULL); 167 checkArgument(deviceId != null, DEVICE_ID_NULL);
168 168
169 String leadershipTopic = createDeviceMastershipTopic(deviceId); 169 String leadershipTopic = createDeviceMastershipTopic(deviceId);
170 if (connectedDevices.add(deviceId)) { 170 if (connectedDevices.add(deviceId)) {
171 - leadershipService.runForLeadership(leadershipTopic); 171 + return leadershipService.runForLeadership(leadershipTopic)
172 - return MastershipRole.STANDBY; 172 + .thenApply(leadership -> {
173 + return Objects.equal(localNodeId, leadership.leader())
174 + ? MastershipRole.MASTER : MastershipRole.STANDBY;
175 + });
173 } else { 176 } else {
174 Leadership leadership = leadershipService.getLeadership(leadershipTopic); 177 Leadership leadership = leadershipService.getLeadership(leadershipTopic);
175 if (leadership != null && leadership.leader().equals(localNodeId)) { 178 if (leadership != null && leadership.leader().equals(localNodeId)) {
176 - return MastershipRole.MASTER; 179 + return CompletableFuture.completedFuture(MastershipRole.MASTER);
177 } else { 180 } else {
178 - return MastershipRole.STANDBY; 181 + return CompletableFuture.completedFuture(MastershipRole.STANDBY);
179 } 182 }
180 } 183 }
181 } 184 }
......
...@@ -205,7 +205,7 @@ public class DistributedMastershipStore ...@@ -205,7 +205,7 @@ public class DistributedMastershipStore
205 } 205 }
206 206
207 @Override 207 @Override
208 - public MastershipRole requestRole(DeviceId deviceId) { 208 + public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
209 209
210 // if no master => become master 210 // if no master => become master
211 // if there already exists a master: 211 // if there already exists a master:
...@@ -225,7 +225,7 @@ public class DistributedMastershipStore ...@@ -225,7 +225,7 @@ public class DistributedMastershipStore
225 225
226 updateTerm(deviceId); 226 updateTerm(deviceId);
227 roleMap.put(deviceId, rv); 227 roleMap.put(deviceId, rv);
228 - return MASTER; 228 + return CompletableFuture.completedFuture(MASTER);
229 } 229 }
230 final MastershipRole currentRole = rv.getRole(local); 230 final MastershipRole currentRole = rv.getRole(local);
231 switch (currentRole) { 231 switch (currentRole) {
...@@ -239,7 +239,7 @@ public class DistributedMastershipStore ...@@ -239,7 +239,7 @@ public class DistributedMastershipStore
239 roleMap.put(deviceId, rv); 239 roleMap.put(deviceId, rv);
240 // trigger BACKUPS_CHANGED? 240 // trigger BACKUPS_CHANGED?
241 } 241 }
242 - return currentRole; 242 + return CompletableFuture.completedFuture(currentRole);
243 case STANDBY: 243 case STANDBY:
244 // RoleInfo integrity check 244 // RoleInfo integrity check
245 modified = rv.reassign(local, NONE, STANDBY); 245 modified = rv.reassign(local, NONE, STANDBY);
...@@ -250,16 +250,16 @@ public class DistributedMastershipStore ...@@ -250,16 +250,16 @@ public class DistributedMastershipStore
250 roleMap.put(deviceId, rv); 250 roleMap.put(deviceId, rv);
251 // trigger BACKUPS_CHANGED? 251 // trigger BACKUPS_CHANGED?
252 } 252 }
253 - return currentRole; 253 + return CompletableFuture.completedFuture(currentRole);
254 case NONE: 254 case NONE:
255 rv.reassign(local, NONE, STANDBY); 255 rv.reassign(local, NONE, STANDBY);
256 roleMap.put(deviceId, rv); 256 roleMap.put(deviceId, rv);
257 // TODO: notifyDelegate BACKUPS_CHANGED 257 // TODO: notifyDelegate BACKUPS_CHANGED
258 - return STANDBY; 258 + return CompletableFuture.completedFuture(STANDBY);
259 default: 259 default:
260 log.warn("unknown Mastership Role {}", currentRole); 260 log.warn("unknown Mastership Role {}", currentRole);
261 } 261 }
262 - return currentRole; 262 + return CompletableFuture.completedFuture(currentRole);
263 } finally { 263 } finally {
264 roleMap.unlock(deviceId); 264 roleMap.unlock(deviceId);
265 } 265 }
......
...@@ -35,6 +35,7 @@ import java.util.HashSet; ...@@ -35,6 +35,7 @@ import java.util.HashSet;
35 import java.util.Map; 35 import java.util.Map;
36 import java.util.Objects; 36 import java.util.Objects;
37 import java.util.Set; 37 import java.util.Set;
38 +import java.util.concurrent.CompletableFuture;
38 39
39 import static junit.framework.TestCase.assertFalse; 40 import static junit.framework.TestCase.assertFalse;
40 import static org.easymock.EasyMock.anyObject; 41 import static org.easymock.EasyMock.anyObject;
...@@ -74,8 +75,11 @@ public class PartitionManagerTest { ...@@ -74,8 +75,11 @@ public class PartitionManagerTest {
74 75
75 leadershipService.addListener(anyObject(LeadershipEventListener.class)); 76 leadershipService.addListener(anyObject(LeadershipEventListener.class));
76 expectLastCall().andDelegateTo(new TestLeadershipService()); 77 expectLastCall().andDelegateTo(new TestLeadershipService());
77 - leadershipService.runForLeadership(anyString()); 78 + for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
78 - expectLastCall().anyTimes(); 79 + expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
80 + .andReturn(CompletableFuture.completedFuture(null))
81 + .times(1);
82 + }
79 83
80 partitionManager = new PartitionManager() 84 partitionManager = new PartitionManager()
81 .withScheduledExecutor(new NullScheduledExecutor()); 85 .withScheduledExecutor(new NullScheduledExecutor());
...@@ -92,6 +96,7 @@ public class PartitionManagerTest { ...@@ -92,6 +96,7 @@ public class PartitionManagerTest {
92 * @param numMine number of partitions that should be owned by the local node 96 * @param numMine number of partitions that should be owned by the local node
93 */ 97 */
94 private void setUpLeadershipService(int numMine) { 98 private void setUpLeadershipService(int numMine) {
99 +
95 Map<String, Leadership> leaderBoard = new HashMap<>(); 100 Map<String, Leadership> leaderBoard = new HashMap<>();
96 101
97 for (int i = 0; i < numMine; i++) { 102 for (int i = 0; i < numMine; i++) {
...@@ -123,7 +128,9 @@ public class PartitionManagerTest { ...@@ -123,7 +128,9 @@ public class PartitionManagerTest {
123 leadershipService.addListener(anyObject(LeadershipEventListener.class)); 128 leadershipService.addListener(anyObject(LeadershipEventListener.class));
124 129
125 for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) { 130 for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
126 - leadershipService.runForLeadership(ELECTION_PREFIX + i); 131 + expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
132 + .andReturn(CompletableFuture.completedFuture(null))
133 + .times(1);
127 } 134 }
128 135
129 replay(leadershipService); 136 replay(leadershipService);
...@@ -172,8 +179,9 @@ public class PartitionManagerTest { ...@@ -172,8 +179,9 @@ public class PartitionManagerTest {
172 // We have all the partitions so we'll need to relinquish some 179 // We have all the partitions so we'll need to relinquish some
173 setUpLeadershipService(PartitionManager.NUM_PARTITIONS); 180 setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
174 181
175 - leadershipService.withdraw(anyString()); 182 + expect(leadershipService.withdraw(anyString()))
176 - expectLastCall().times(7); 183 + .andReturn(CompletableFuture.completedFuture(null))
184 + .times(7);
177 185
178 replay(leadershipService); 186 replay(leadershipService);
179 187
......
...@@ -145,22 +145,22 @@ public class DistributedMastershipStoreTest { ...@@ -145,22 +145,22 @@ public class DistributedMastershipStoreTest {
145 145
146 //if already MASTER, nothing should happen 146 //if already MASTER, nothing should happen
147 testStore.put(DID2, N1, true, false, true); 147 testStore.put(DID2, N1, true, false, true);
148 - assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2)); 148 + assertEquals("wrong role for MASTER:", MASTER, Futures.getUnchecked(dms.requestRole(DID2)));
149 149
150 //populate maps with DID1, N1 thru NONE case 150 //populate maps with DID1, N1 thru NONE case
151 - assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1)); 151 + assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
152 assertTrue("wrong state for store:", !dms.terms.isEmpty()); 152 assertTrue("wrong state for store:", !dms.terms.isEmpty());
153 assertEquals("wrong term", 153 assertEquals("wrong term",
154 MastershipTerm.of(N1, 1), dms.getTermFor(DID1)); 154 MastershipTerm.of(N1, 1), dms.getTermFor(DID1));
155 155
156 //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY 156 //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
157 testStore.setCurrent(CN2); 157 testStore.setCurrent(CN2);
158 - assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2)); 158 + assertEquals("wrong role for STANDBY:", STANDBY, Futures.getUnchecked(dms.requestRole(DID2)));
159 assertEquals("wrong number of entries:", 2, dms.terms.size()); 159 assertEquals("wrong number of entries:", 2, dms.terms.size());
160 160
161 //change term and requestRole() again; should persist 161 //change term and requestRole() again; should persist
162 testStore.increment(DID2); 162 testStore.increment(DID2);
163 - assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2)); 163 + assertEquals("wrong role for STANDBY:", STANDBY, Futures.getUnchecked(dms.requestRole(DID2)));
164 assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2)); 164 assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
165 } 165 }
166 166
...@@ -168,7 +168,7 @@ public class DistributedMastershipStoreTest { ...@@ -168,7 +168,7 @@ public class DistributedMastershipStoreTest {
168 public void setMaster() { 168 public void setMaster() {
169 //populate maps with DID1, N1 as MASTER thru NONE case 169 //populate maps with DID1, N1 as MASTER thru NONE case
170 testStore.setCurrent(CN1); 170 testStore.setCurrent(CN1);
171 - assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1)); 171 + assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
172 assertNull("wrong event:", Futures.getUnchecked(dms.setMaster(N1, DID1))); 172 assertNull("wrong event:", Futures.getUnchecked(dms.setMaster(N1, DID1)));
173 173
174 //switch over to N2 174 //switch over to N2
...@@ -189,7 +189,7 @@ public class DistributedMastershipStoreTest { ...@@ -189,7 +189,7 @@ public class DistributedMastershipStoreTest {
189 public void relinquishRole() { 189 public void relinquishRole() {
190 //populate maps with DID1, N1 as MASTER thru NONE case 190 //populate maps with DID1, N1 as MASTER thru NONE case
191 testStore.setCurrent(CN1); 191 testStore.setCurrent(CN1);
192 - assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1)); 192 + assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
193 //no backup, no new MASTER/event 193 //no backup, no new MASTER/event
194 assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N1, DID1))); 194 assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N1, DID1)));
195 195
...@@ -197,7 +197,7 @@ public class DistributedMastershipStoreTest { ...@@ -197,7 +197,7 @@ public class DistributedMastershipStoreTest {
197 197
198 //add backup CN2, get it elected MASTER by relinquishing 198 //add backup CN2, get it elected MASTER by relinquishing
199 testStore.setCurrent(CN2); 199 testStore.setCurrent(CN2);
200 - assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1)); 200 + assertEquals("wrong role for NONE:", STANDBY, Futures.getUnchecked(dms.requestRole(DID1)));
201 assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type()); 201 assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
202 assertEquals("wrong master", N2, dms.getMaster(DID1)); 202 assertEquals("wrong master", N2, dms.getMaster(DID1));
203 203
...@@ -209,9 +209,9 @@ public class DistributedMastershipStoreTest { ...@@ -209,9 +209,9 @@ public class DistributedMastershipStoreTest {
209 dms.roleMap.get(DID1).nodesOfRole(NONE).size()); 209 dms.roleMap.get(DID1).nodesOfRole(NONE).size());
210 210
211 //bring nodes back 211 //bring nodes back
212 - assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1)); 212 + assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
213 testStore.setCurrent(CN1); 213 testStore.setCurrent(CN1);
214 - assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1)); 214 + assertEquals("wrong role for NONE:", STANDBY, Futures.getUnchecked(dms.requestRole(DID1)));
215 assertEquals("wrong number of backup nodes", 1, 215 assertEquals("wrong number of backup nodes", 1,
216 dms.roleMap.get(DID1).nodesOfRole(STANDBY).size()); 216 dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
217 217
......
...@@ -21,6 +21,7 @@ import java.util.List; ...@@ -21,6 +21,7 @@ import java.util.List;
21 import java.util.Map; 21 import java.util.Map;
22 import java.util.Map.Entry; 22 import java.util.Map.Entry;
23 import java.util.Set; 23 import java.util.Set;
24 +import java.util.concurrent.CompletableFuture;
24 import java.util.concurrent.ConcurrentHashMap; 25 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.CopyOnWriteArraySet; 26 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.stream.Collectors; 27 import java.util.stream.Collectors;
...@@ -76,21 +77,23 @@ public class SimpleLeadershipManager implements LeadershipService { ...@@ -76,21 +77,23 @@ public class SimpleLeadershipManager implements LeadershipService {
76 } 77 }
77 78
78 @Override 79 @Override
79 - public void runForLeadership(String path) { 80 + public CompletableFuture<Leadership> runForLeadership(String path) {
80 elections.put(path, true); 81 elections.put(path, true);
81 for (LeadershipEventListener listener : listeners) { 82 for (LeadershipEventListener listener : listeners) {
82 listener.event(new LeadershipEvent(Type.LEADER_ELECTED, 83 listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
83 new Leadership(path, clusterService.getLocalNode().id(), 0, 0))); 84 new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
84 } 85 }
86 + return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0));
85 } 87 }
86 88
87 @Override 89 @Override
88 - public void withdraw(String path) { 90 + public CompletableFuture<Void> withdraw(String path) {
89 elections.remove(path); 91 elections.remove(path);
90 for (LeadershipEventListener listener : listeners) { 92 for (LeadershipEventListener listener : listeners) {
91 listener.event(new LeadershipEvent(Type.LEADER_BOOTED, 93 listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
92 new Leadership(path, clusterService.getLocalNode().id(), 0, 0))); 94 new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
93 } 95 }
96 + return CompletableFuture.completedFuture(null);
94 } 97 }
95 98
96 @Override 99 @Override
......
...@@ -191,14 +191,14 @@ public class SimpleMastershipStore ...@@ -191,14 +191,14 @@ public class SimpleMastershipStore
191 } 191 }
192 192
193 @Override 193 @Override
194 - public synchronized MastershipRole requestRole(DeviceId deviceId) { 194 + public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
195 //query+possible reelection 195 //query+possible reelection
196 NodeId node = clusterService.getLocalNode().id(); 196 NodeId node = clusterService.getLocalNode().id();
197 MastershipRole role = getRole(node, deviceId); 197 MastershipRole role = getRole(node, deviceId);
198 198
199 switch (role) { 199 switch (role) {
200 case MASTER: 200 case MASTER:
201 - return MastershipRole.MASTER; 201 + return CompletableFuture.completedFuture(MastershipRole.MASTER);
202 case STANDBY: 202 case STANDBY:
203 if (getMaster(deviceId) == null) { 203 if (getMaster(deviceId) == null) {
204 // no master => become master 204 // no master => become master
...@@ -208,9 +208,9 @@ public class SimpleMastershipStore ...@@ -208,9 +208,9 @@ public class SimpleMastershipStore
208 removeFromBackups(deviceId, node); 208 removeFromBackups(deviceId, node);
209 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, 209 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
210 getNodes(deviceId))); 210 getNodes(deviceId)));
211 - return MastershipRole.MASTER; 211 + return CompletableFuture.completedFuture(MastershipRole.MASTER);
212 } 212 }
213 - return MastershipRole.STANDBY; 213 + return CompletableFuture.completedFuture(MastershipRole.STANDBY);
214 case NONE: 214 case NONE:
215 if (getMaster(deviceId) == null) { 215 if (getMaster(deviceId) == null) {
216 // no master => become master 216 // no master => become master
...@@ -218,18 +218,18 @@ public class SimpleMastershipStore ...@@ -218,18 +218,18 @@ public class SimpleMastershipStore
218 incrementTerm(deviceId); 218 incrementTerm(deviceId);
219 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, 219 notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
220 getNodes(deviceId))); 220 getNodes(deviceId)));
221 - return MastershipRole.MASTER; 221 + return CompletableFuture.completedFuture(MastershipRole.MASTER);
222 } 222 }
223 // add to backup list 223 // add to backup list
224 if (addToBackup(deviceId, node)) { 224 if (addToBackup(deviceId, node)) {
225 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, 225 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
226 getNodes(deviceId))); 226 getNodes(deviceId)));
227 } 227 }
228 - return MastershipRole.STANDBY; 228 + return CompletableFuture.completedFuture(MastershipRole.STANDBY);
229 default: 229 default:
230 log.warn("unknown Mastership Role {}", role); 230 log.warn("unknown Mastership Role {}", role);
231 } 231 }
232 - return role; 232 + return CompletableFuture.completedFuture(role);
233 } 233 }
234 234
235 // add to backup if not there already, silently ignores null node 235 // add to backup if not there already, silently ignores null node
......
...@@ -130,19 +130,19 @@ public class SimpleMastershipStoreTest { ...@@ -130,19 +130,19 @@ public class SimpleMastershipStoreTest {
130 public void requestRole() { 130 public void requestRole() {
131 //NONE - become MASTER 131 //NONE - become MASTER
132 put(DID1, N1, false, false); 132 put(DID1, N1, false, false);
133 - assertEquals("wrong role", MASTER, sms.requestRole(DID1)); 133 + assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID1)));
134 134
135 //was STANDBY - become MASTER 135 //was STANDBY - become MASTER
136 put(DID2, N1, false, true); 136 put(DID2, N1, false, true);
137 - assertEquals("wrong role", MASTER, sms.requestRole(DID2)); 137 + assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID2)));
138 138
139 //other MASTER - stay STANDBY 139 //other MASTER - stay STANDBY
140 put(DID3, N2, true, false); 140 put(DID3, N2, true, false);
141 - assertEquals("wrong role", STANDBY, sms.requestRole(DID3)); 141 + assertEquals("wrong role", STANDBY, Futures.getUnchecked(sms.requestRole(DID3)));
142 142
143 //local (N1) is MASTER - stay MASTER 143 //local (N1) is MASTER - stay MASTER
144 put(DID4, N1, true, true); 144 put(DID4, N1, true, true);
145 - assertEquals("wrong role", MASTER, sms.requestRole(DID4)); 145 + assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID4)));
146 } 146 }
147 147
148 @Test 148 @Test
......
...@@ -30,6 +30,7 @@ import java.util.HashMap; ...@@ -30,6 +30,7 @@ import java.util.HashMap;
30 import java.util.List; 30 import java.util.List;
31 import java.util.Map; 31 import java.util.Map;
32 import java.util.Set; 32 import java.util.Set;
33 +import java.util.concurrent.CompletableFuture;
33 34
34 import org.junit.After; 35 import org.junit.After;
35 import org.junit.Before; 36 import org.junit.Before;
...@@ -496,8 +497,8 @@ public class LLDPLinkProviderTest { ...@@ -496,8 +497,8 @@ public class LLDPLinkProviderTest {
496 } 497 }
497 498
498 @Override 499 @Override
499 - public MastershipRole requestRoleFor(DeviceId deviceId) { 500 + public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
500 - return null; 501 + return CompletableFuture.completedFuture(null);
501 } 502 }
502 503
503 @Override 504 @Override
......