Madan Jampani
Committed by Gerrit Code Review

ONOS-2068: Refresh Leadership periodically from global map.

Change-Id: I50cff6546d79a275f4c026a4f3b2efe5d2eefd58
...@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableList; ...@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableList;
4 import com.google.common.collect.ImmutableMap; 4 import com.google.common.collect.ImmutableMap;
5 import com.google.common.collect.Iterables; 5 import com.google.common.collect.Iterables;
6 import com.google.common.collect.Lists; 6 import com.google.common.collect.Lists;
7 +import com.google.common.collect.MapDifference;
7 import com.google.common.collect.Maps; 8 import com.google.common.collect.Maps;
8 import com.google.common.collect.Sets; 9 import com.google.common.collect.Sets;
9 10
...@@ -89,7 +90,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -89,7 +90,7 @@ public class DistributedLeadershipManager implements LeadershipService {
89 private ScheduledExecutorService electionRunner; 90 private ScheduledExecutorService electionRunner;
90 private ScheduledExecutorService lockExecutor; 91 private ScheduledExecutorService lockExecutor;
91 private ScheduledExecutorService staleLeadershipPurgeExecutor; 92 private ScheduledExecutorService staleLeadershipPurgeExecutor;
92 - private ScheduledExecutorService leadershipStatusBroadcaster; 93 + private ScheduledExecutorService leadershipRefresher;
93 94
94 private ConsistentMap<String, NodeId> leaderMap; 95 private ConsistentMap<String, NodeId> leaderMap;
95 private ConsistentMap<String, List<NodeId>> candidateMap; 96 private ConsistentMap<String, List<NodeId>> candidateMap;
...@@ -106,7 +107,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -106,7 +107,7 @@ public class DistributedLeadershipManager implements LeadershipService {
106 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS) 107 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
107 private static final int WAIT_BEFORE_RETRY_MILLIS = 150; 108 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
108 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; 109 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
109 - private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2; 110 + private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
110 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2; 111 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
111 112
112 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); 113 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
...@@ -135,8 +136,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -135,8 +136,8 @@ public class DistributedLeadershipManager implements LeadershipService {
135 4, groupedThreads("onos/store/leadership", "election-thread-%d")); 136 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
136 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( 137 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
137 groupedThreads("onos/store/leadership", "stale-leadership-evictor")); 138 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
138 - leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor( 139 + leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
139 - groupedThreads("onos/store/leadership", "peer-updater")); 140 + groupedThreads("onos/store/leadership", "refresh-thread"));
140 clusterCommunicator.addSubscriber( 141 clusterCommunicator.addSubscriber(
141 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 142 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
142 SERIALIZER::decode, 143 SERIALIZER::decode,
...@@ -148,8 +149,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -148,8 +149,8 @@ public class DistributedLeadershipManager implements LeadershipService {
148 electionRunner.scheduleWithFixedDelay( 149 electionRunner.scheduleWithFixedDelay(
149 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS); 150 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
150 151
151 - leadershipStatusBroadcaster.scheduleWithFixedDelay( 152 + leadershipRefresher.scheduleWithFixedDelay(
152 - this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS); 153 + this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
153 154
154 listenerRegistry = new ListenerRegistry<>(); 155 listenerRegistry = new ListenerRegistry<>();
155 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); 156 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
...@@ -173,7 +174,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -173,7 +174,7 @@ public class DistributedLeadershipManager implements LeadershipService {
173 messageHandlingExecutor.shutdown(); 174 messageHandlingExecutor.shutdown();
174 lockExecutor.shutdown(); 175 lockExecutor.shutdown();
175 staleLeadershipPurgeExecutor.shutdown(); 176 staleLeadershipPurgeExecutor.shutdown();
176 - leadershipStatusBroadcaster.shutdown(); 177 + leadershipRefresher.shutdown();
177 178
178 log.info("Stopped"); 179 log.info("Stopped");
179 } 180 }
...@@ -458,6 +459,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -458,6 +459,7 @@ public class DistributedLeadershipManager implements LeadershipService {
458 leaderBoard.compute(topic, (k, currentLeadership) -> { 459 leaderBoard.compute(topic, (k, currentLeadership) -> {
459 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) { 460 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
460 updateAccepted.set(true); 461 updateAccepted.set(true);
462 + // FIXME: Removing entries from leaderboard is not safe and should be visited.
461 return null; 463 return null;
462 } 464 }
463 return currentLeadership; 465 return currentLeadership;
...@@ -579,18 +581,47 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -579,18 +581,47 @@ public class DistributedLeadershipManager implements LeadershipService {
579 } 581 }
580 } 582 }
581 583
582 - private void sendLeadershipStatus() { 584 + private void refreshLeaderBoard() {
583 try { 585 try {
584 - leaderBoard.forEach((path, leadership) -> { 586 + Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
585 - if (leadership.leader().equals(localNodeId)) { 587 + leaderMap.entrySet().forEach(entry -> {
586 - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership); 588 + String path = entry.getKey();
587 - clusterCommunicator.broadcast(event, 589 + Versioned<NodeId> leader = entry.getValue();
588 - LEADERSHIP_EVENT_MESSAGE_SUBJECT, 590 + Leadership leadership = new Leadership(path,
589 - SERIALIZER::encode); 591 + leader.value(),
592 + leader.version(),
593 + leader.creationTime());
594 + newLeaderBoard.put(path, leadership);
595 + });
596 +
597 + // first take snapshot of current leader board.
598 + Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
599 +
600 + MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
601 +
602 + // evict stale leaders
603 + diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
604 + log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
605 + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
606 + });
607 +
608 + // add missing leaders
609 + diff.entriesOnlyOnRight().forEach((path, leadership) -> {
610 + log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
611 + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
612 + });
613 +
614 + // add updated leaders
615 + diff.entriesDiffering().forEach((path, difference) -> {
616 + Leadership current = difference.leftValue();
617 + Leadership updated = difference.rightValue();
618 + if (current.epoch() < updated.epoch()) {
619 + log.debug("Updated {} in leaderboard.", updated);
620 + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
590 } 621 }
591 }); 622 });
592 } catch (Exception e) { 623 } catch (Exception e) {
593 - log.debug("Failed to send leadership updates", e); 624 + log.debug("Failed to refresh leader board", e);
594 } 625 }
595 } 626 }
596 627
......