Madan Jampani
Committed by Gerrit Code Review

ONOS-2026: Address polling issue in LeaderElection

Change-Id: Ib5c94d932de6b2c3419b07a97d6fe91d5c588538
...@@ -84,7 +84,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -84,7 +84,8 @@ public class DistributedLeadershipManager implements LeadershipService {
84 84
85 private final Logger log = getLogger(getClass()); 85 private final Logger log = getLogger(getClass());
86 private ExecutorService messageHandlingExecutor; 86 private ExecutorService messageHandlingExecutor;
87 - private ScheduledExecutorService retryLeaderLockExecutor; 87 + private ScheduledExecutorService electionRunner;
88 + private ScheduledExecutorService lockExecutor;
88 private ScheduledExecutorService staleLeadershipPurgeExecutor; 89 private ScheduledExecutorService staleLeadershipPurgeExecutor;
89 private ScheduledExecutorService leadershipStatusBroadcaster; 90 private ScheduledExecutorService leadershipStatusBroadcaster;
90 91
...@@ -98,6 +99,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -98,6 +99,7 @@ public class DistributedLeadershipManager implements LeadershipService {
98 99
99 private NodeId localNodeId; 100 private NodeId localNodeId;
100 private Set<String> activeTopics = Sets.newConcurrentHashSet(); 101 private Set<String> activeTopics = Sets.newConcurrentHashSet();
102 + private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
101 103
102 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2; 104 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
103 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; 105 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
...@@ -125,7 +127,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -125,7 +127,9 @@ public class DistributedLeadershipManager implements LeadershipService {
125 127
126 messageHandlingExecutor = Executors.newSingleThreadExecutor( 128 messageHandlingExecutor = Executors.newSingleThreadExecutor(
127 groupedThreads("onos/store/leadership", "message-handler")); 129 groupedThreads("onos/store/leadership", "message-handler"));
128 - retryLeaderLockExecutor = Executors.newScheduledThreadPool( 130 + electionRunner = Executors.newSingleThreadScheduledExecutor(
131 + groupedThreads("onos/store/leadership", "election-runner"));
132 + lockExecutor = Executors.newScheduledThreadPool(
129 4, groupedThreads("onos/store/leadership", "election-thread-%d")); 133 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
130 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( 134 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
131 groupedThreads("onos/store/leadership", "stale-leadership-evictor")); 135 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
...@@ -139,6 +143,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -139,6 +143,9 @@ public class DistributedLeadershipManager implements LeadershipService {
139 143
140 clusterService.addListener(clusterEventListener); 144 clusterService.addListener(clusterEventListener);
141 145
146 + electionRunner.scheduleWithFixedDelay(
147 + this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
148 +
142 leadershipStatusBroadcaster.scheduleWithFixedDelay( 149 leadershipStatusBroadcaster.scheduleWithFixedDelay(
143 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS); 150 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
144 151
...@@ -160,8 +167,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -160,8 +167,9 @@ public class DistributedLeadershipManager implements LeadershipService {
160 eventDispatcher.removeSink(LeadershipEvent.class); 167 eventDispatcher.removeSink(LeadershipEvent.class);
161 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT); 168 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
162 169
170 + electionRunner.shutdown();
163 messageHandlingExecutor.shutdown(); 171 messageHandlingExecutor.shutdown();
164 - retryLeaderLockExecutor.shutdown(); 172 + lockExecutor.shutdown();
165 staleLeadershipPurgeExecutor.shutdown(); 173 staleLeadershipPurgeExecutor.shutdown();
166 leadershipStatusBroadcaster.shutdown(); 174 leadershipStatusBroadcaster.shutdown();
167 175
...@@ -236,7 +244,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -236,7 +244,12 @@ public class DistributedLeadershipManager implements LeadershipService {
236 candidates.creationTime()))); 244 candidates.creationTime())));
237 log.debug("In the leadership race for topic {} with candidates {}", path, candidates); 245 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
238 activeTopics.add(path); 246 activeTopics.add(path);
239 - tryLeaderLock(path, future); 247 + Leadership leadership = electLeader(path, candidates.value());
248 + if (leadership == null) {
249 + pendingFutures.put(path, future);
250 + } else {
251 + future.complete(leadership);
252 + }
240 } catch (ConsistentMapException e) { 253 } catch (ConsistentMapException e) {
241 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e); 254 log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
242 rerunForLeadership(path, future); 255 rerunForLeadership(path, future);
...@@ -315,7 +328,6 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -315,7 +328,6 @@ public class DistributedLeadershipManager implements LeadershipService {
315 localNodeId, 328 localNodeId,
316 leader.version(), 329 leader.version(),
317 leader.creationTime()))); 330 leader.creationTime())));
318 - retryLock(path, new CompletableFuture<>());
319 return true; 331 return true;
320 } 332 }
321 } 333 }
...@@ -355,50 +367,55 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -355,50 +367,55 @@ public class DistributedLeadershipManager implements LeadershipService {
355 return true; 367 return true;
356 } 368 }
357 369
358 - private void tryLeaderLock(String path, CompletableFuture<Leadership> future) { 370 + private Leadership electLeader(String path, List<NodeId> candidates) {
359 - if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) { 371 + Leadership currentLeadership = getLeadership(path);
360 - return; 372 + if (currentLeadership != null) {
361 - } 373 + return currentLeadership;
362 - try { 374 + } else {
363 - Versioned<List<NodeId>> candidates = candidateMap.get(path); 375 + NodeId topCandidate = candidates
364 - if (candidates != null) { 376 + .stream()
365 - List<NodeId> activeNodes = candidates.value() 377 + .filter(n -> clusterService.getState(n) == ACTIVE)
366 - .stream() 378 + .findFirst()
367 - .filter(n -> clusterService.getState(n) == ACTIVE) 379 + .orElse(null);
368 - .collect(Collectors.toList()); 380 + try {
369 - if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) { 381 + Versioned<NodeId> leader = localNodeId.equals(topCandidate)
370 - leaderLockAttempt(path, candidates.value(), future); 382 + ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
371 - } else { 383 + if (leader != null) {
372 - retryLock(path, future); 384 + Leadership newLeadership = new Leadership(path,
385 + leader.value(),
386 + leader.version(),
387 + leader.creationTime());
388 + publish(new LeadershipEvent(
389 + LeadershipEvent.Type.LEADER_ELECTED,
390 + newLeadership));
391 + return newLeadership;
373 } 392 }
374 - } else { 393 + } catch (Exception e) {
375 - throw new IllegalStateException("should not be here"); 394 + log.debug("Failed to elect leader for {}", path, e);
376 } 395 }
377 - } catch (Exception e) {
378 - log.debug("Failed to fetch candidate information for {}", path, e);
379 - retryLock(path, future);
380 } 396 }
397 + return null;
381 } 398 }
382 399
383 - private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) { 400 + private void electLeaders() {
384 try { 401 try {
385 - Versioned<NodeId> leader = leaderMap.computeIfAbsent(path, p -> localNodeId); 402 + candidateMap.entrySet().forEach(entry -> {
386 - if (Objects.equals(leader.value(), localNodeId)) { 403 + String path = entry.getKey();
387 - log.debug("Assumed leadership for {}", path); 404 + List<NodeId> candidates = entry.getValue().value();
388 - Leadership leadership = new Leadership(path, 405 + if (activeTopics.contains(path)) {
389 - leader.value(), 406 + lockExecutor.submit(() -> {
390 - leader.version(), 407 + Leadership leadership = electLeader(path, candidates);
391 - leader.creationTime()); 408 + if (leadership != null) {
392 - future.complete(leadership); 409 + CompletableFuture<Leadership> future = pendingFutures.remove(path);
393 - publish(new LeadershipEvent( 410 + if (future != null) {
394 - LeadershipEvent.Type.LEADER_ELECTED, 411 + future.complete(leadership);
395 - leadership)); 412 + }
396 - } else { 413 + }
397 - retryLock(path, future); 414 + });
398 - } 415 + }
416 + });
399 } catch (Exception e) { 417 } catch (Exception e) {
400 - log.debug("Attempt to acquire leadership lock for topic {} failed", path, e); 418 + log.debug("Failure electing leaders", e);
401 - retryLock(path, future);
402 } 419 }
403 } 420 }
404 421
...@@ -451,21 +468,14 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -451,21 +468,14 @@ public class DistributedLeadershipManager implements LeadershipService {
451 } 468 }
452 469
453 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) { 470 private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
454 - retryLeaderLockExecutor.schedule( 471 + lockExecutor.schedule(
455 () -> doRunForLeadership(path, future), 472 () -> doRunForLeadership(path, future),
456 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC, 473 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
457 TimeUnit.SECONDS); 474 TimeUnit.SECONDS);
458 } 475 }
459 476
460 - private void retryLock(String path, CompletableFuture<Leadership> future) {
461 - retryLeaderLockExecutor.schedule(
462 - () -> tryLeaderLock(path, future),
463 - DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
464 - TimeUnit.SECONDS);
465 - }
466 -
467 private void retryWithdraw(String path, CompletableFuture<Void> future) { 477 private void retryWithdraw(String path, CompletableFuture<Void> future) {
468 - retryLeaderLockExecutor.schedule( 478 + lockExecutor.schedule(
469 () -> doWithdraw(path, future), 479 () -> doWithdraw(path, future),
470 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, 480 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
471 TimeUnit.SECONDS); 481 TimeUnit.SECONDS);
......