Madan Jampani
Committed by Gerrit Code Review

Revert leaderboard refresh changes as they seem to interfere with withdrawl

Change-Id: Ia0851be4e5457271b5a0e65bde1863454981f6cb
...@@ -89,7 +89,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -89,7 +89,7 @@ public class DistributedLeadershipManager implements LeadershipService {
89 private ScheduledExecutorService electionRunner; 89 private ScheduledExecutorService electionRunner;
90 private ScheduledExecutorService lockExecutor; 90 private ScheduledExecutorService lockExecutor;
91 private ScheduledExecutorService staleLeadershipPurgeExecutor; 91 private ScheduledExecutorService staleLeadershipPurgeExecutor;
92 - private ScheduledExecutorService leadershipRefresher; 92 + private ScheduledExecutorService leadershipStatusBroadcaster;
93 93
94 private ConsistentMap<String, NodeId> leaderMap; 94 private ConsistentMap<String, NodeId> leaderMap;
95 private ConsistentMap<String, List<NodeId>> candidateMap; 95 private ConsistentMap<String, List<NodeId>> candidateMap;
...@@ -106,7 +106,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -106,7 +106,7 @@ public class DistributedLeadershipManager implements LeadershipService {
106 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS) 106 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
107 private static final int WAIT_BEFORE_RETRY_MILLIS = 150; 107 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
108 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; 108 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
109 - private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2; 109 + private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
110 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2; 110 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
111 111
112 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); 112 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
...@@ -135,8 +135,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -135,8 +135,8 @@ public class DistributedLeadershipManager implements LeadershipService {
135 4, groupedThreads("onos/store/leadership", "election-thread-%d")); 135 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
136 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( 136 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
137 groupedThreads("onos/store/leadership", "stale-leadership-evictor")); 137 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
138 - leadershipRefresher = Executors.newSingleThreadScheduledExecutor( 138 + leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
139 - groupedThreads("onos/store/leadership", "refresh-thread")); 139 + groupedThreads("onos/store/leadership", "peer-updater"));
140 clusterCommunicator.addSubscriber( 140 clusterCommunicator.addSubscriber(
141 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 141 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
142 SERIALIZER::decode, 142 SERIALIZER::decode,
...@@ -148,8 +148,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -148,8 +148,8 @@ public class DistributedLeadershipManager implements LeadershipService {
148 electionRunner.scheduleWithFixedDelay( 148 electionRunner.scheduleWithFixedDelay(
149 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS); 149 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
150 150
151 - leadershipRefresher.scheduleWithFixedDelay( 151 + leadershipStatusBroadcaster.scheduleWithFixedDelay(
152 - this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS); 152 + this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
153 153
154 listenerRegistry = new ListenerRegistry<>(); 154 listenerRegistry = new ListenerRegistry<>();
155 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); 155 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
...@@ -173,7 +173,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -173,7 +173,7 @@ public class DistributedLeadershipManager implements LeadershipService {
173 messageHandlingExecutor.shutdown(); 173 messageHandlingExecutor.shutdown();
174 lockExecutor.shutdown(); 174 lockExecutor.shutdown();
175 staleLeadershipPurgeExecutor.shutdown(); 175 staleLeadershipPurgeExecutor.shutdown();
176 - leadershipRefresher.shutdown(); 176 + leadershipStatusBroadcaster.shutdown();
177 177
178 log.info("Stopped"); 178 log.info("Stopped");
179 } 179 }
...@@ -458,7 +458,6 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -458,7 +458,6 @@ public class DistributedLeadershipManager implements LeadershipService {
458 leaderBoard.compute(topic, (k, currentLeadership) -> { 458 leaderBoard.compute(topic, (k, currentLeadership) -> {
459 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) { 459 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
460 updateAccepted.set(true); 460 updateAccepted.set(true);
461 - // FIXME: Removing entries from leaderboard is not safe and should be visited.
462 return null; 461 return null;
463 } 462 }
464 return currentLeadership; 463 return currentLeadership;
...@@ -580,36 +579,18 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -580,36 +579,18 @@ public class DistributedLeadershipManager implements LeadershipService {
580 } 579 }
581 } 580 }
582 581
583 - private void refreshLeaderBoard() { 582 + private void sendLeadershipStatus() {
584 try { 583 try {
585 - Map<String, Leadership> newLeaderBoard = Maps.newHashMap(); 584 + leaderBoard.forEach((path, leadership) -> {
586 - leaderMap.entrySet().forEach(entry -> { 585 + if (leadership.leader().equals(localNodeId)) {
587 - String path = entry.getKey(); 586 + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
588 - Versioned<NodeId> leader = entry.getValue(); 587 + clusterCommunicator.broadcast(event,
589 - Leadership leadership = new Leadership(path, 588 + LEADERSHIP_EVENT_MESSAGE_SUBJECT,
590 - leader.value(), 589 + SERIALIZER::encode);
591 - leader.version(), 590 + }
592 - leader.creationTime());
593 - newLeaderBoard.put(path, leadership);
594 - });
595 -
596 - // first take snapshot of current leader board.
597 - Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
598 -
599 - // evict stale leaders
600 - Maps.difference(currentLeaderBoard, newLeaderBoard).entriesOnlyOnLeft().forEach((path, leadership) -> {
601 - log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
602 - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
603 - });
604 -
605 - // add missing leaders
606 - Maps.difference(currentLeaderBoard, newLeaderBoard).entriesDiffering().forEach((path, difference) -> {
607 - Leadership leadership = difference.rightValue();
608 - log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
609 - onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
610 }); 591 });
611 } catch (Exception e) { 592 } catch (Exception e) {
612 - log.debug("Failed to refresh leader board", e); 593 + log.debug("Failed to send leadership updates", e);
613 } 594 }
614 } 595 }
615 596
......