Madan Jampani

Refresh candidate board from source on each election round + Disbale east-west s…

…ynchronization of candidate board

Change-Id: Ie796e0ff0bdd2da834f70f24e98725a309e97787
......@@ -106,7 +106,6 @@ public class DistributedLeadershipManager implements LeadershipService {
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
private static final int LEADER_CANDIDATE_POS = 0;
private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
......@@ -303,7 +302,7 @@ public class DistributedLeadershipManager implements LeadershipService {
newCandidates.version(),
newCandidates.creationTime())));
} else {
log.warn("Failed to withdraw from candidates list. Will retry");
log.warn("Failed to withdraw from candidates list for {}. Will retry", path);
retryWithdraw(path, future);
}
} catch (Exception e) {
......@@ -403,10 +402,11 @@ public class DistributedLeadershipManager implements LeadershipService {
try {
candidateMap.entrySet().forEach(entry -> {
String path = entry.getKey();
List<NodeId> candidates = entry.getValue().value();
Versioned<List<NodeId>> candidates = entry.getValue();
// for active topics, check if this node can become a leader (if it isn't already)
if (activeTopics.contains(path)) {
lockExecutor.submit(() -> {
Leadership leadership = electLeader(path, candidates);
Leadership leadership = electLeader(path, candidates.value());
if (leadership != null) {
CompletableFuture<Leadership> future = pendingFutures.remove(path);
if (future != null) {
......@@ -415,6 +415,14 @@ public class DistributedLeadershipManager implements LeadershipService {
}
});
}
// Raise a CANDIDATES_CHANGED event to force refresh local candidate board
// and also to update local listeners.
// Don't worry about duplicate events as they will be suppressed.
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(path,
candidates.value(),
candidates.version(),
candidates.creationTime())));
});
} catch (Exception e) {
log.debug("Failure electing leaders", e);
......@@ -579,12 +587,6 @@ public class DistributedLeadershipManager implements LeadershipService {
SERIALIZER::encode);
}
});
candidateBoard.forEach((path, leadership) -> {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
clusterCommunicator.broadcast(event,
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER::encode);
});
} catch (Exception e) {
log.debug("Failed to send leadership updates", e);
}
......