Madan Jampani
Committed by Brian O'Connor

Updated DistributedLeadershipManager to use ConsistentMap notifications

Change-Id: Ice4e9b295f4216fee13144ec631904f34bdf7b2b
...@@ -15,7 +15,6 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -15,7 +15,6 @@ import org.apache.felix.scr.annotations.Deactivate;
15 import org.apache.felix.scr.annotations.Reference; 15 import org.apache.felix.scr.annotations.Reference;
16 import org.apache.felix.scr.annotations.ReferenceCardinality; 16 import org.apache.felix.scr.annotations.ReferenceCardinality;
17 import org.apache.felix.scr.annotations.Service; 17 import org.apache.felix.scr.annotations.Service;
18 -import org.onlab.util.KryoNamespace;
19 import org.onosproject.cluster.ClusterEvent; 18 import org.onosproject.cluster.ClusterEvent;
20 import org.onosproject.cluster.ClusterEvent.Type; 19 import org.onosproject.cluster.ClusterEvent.Type;
21 import org.onosproject.cluster.ClusterEventListener; 20 import org.onosproject.cluster.ClusterEventListener;
...@@ -28,10 +27,10 @@ import org.onosproject.cluster.NodeId; ...@@ -28,10 +27,10 @@ import org.onosproject.cluster.NodeId;
28 import org.onosproject.event.ListenerRegistry; 27 import org.onosproject.event.ListenerRegistry;
29 import org.onosproject.event.EventDeliveryService; 28 import org.onosproject.event.EventDeliveryService;
30 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 29 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31 -import org.onosproject.store.cluster.messaging.MessageSubject;
32 import org.onosproject.store.serializers.KryoNamespaces; 30 import org.onosproject.store.serializers.KryoNamespaces;
33 import org.onosproject.store.service.ConsistentMap; 31 import org.onosproject.store.service.ConsistentMap;
34 import org.onosproject.store.service.ConsistentMapException; 32 import org.onosproject.store.service.ConsistentMapException;
33 +import org.onosproject.store.service.MapEvent;
35 import org.onosproject.store.service.Serializer; 34 import org.onosproject.store.service.Serializer;
36 import org.onosproject.store.service.StorageService; 35 import org.onosproject.store.service.StorageService;
37 import org.onosproject.store.service.Versioned; 36 import org.onosproject.store.service.Versioned;
...@@ -46,7 +45,6 @@ import java.util.Set; ...@@ -46,7 +45,6 @@ import java.util.Set;
46 import java.util.List; 45 import java.util.List;
47 import java.util.concurrent.CancellationException; 46 import java.util.concurrent.CancellationException;
48 import java.util.concurrent.CompletableFuture; 47 import java.util.concurrent.CompletableFuture;
49 -import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Executors; 48 import java.util.concurrent.Executors;
51 import java.util.concurrent.ScheduledExecutorService; 49 import java.util.concurrent.ScheduledExecutorService;
52 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.TimeUnit;
...@@ -82,11 +80,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -82,11 +80,7 @@ public class DistributedLeadershipManager implements LeadershipService {
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected EventDeliveryService eventDispatcher; 81 protected EventDeliveryService eventDispatcher;
84 82
85 - private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
86 - new MessageSubject("distributed-leadership-manager-events");
87 -
88 private final Logger log = getLogger(getClass()); 83 private final Logger log = getLogger(getClass());
89 - private ExecutorService messageHandlingExecutor;
90 private ScheduledExecutorService electionRunner; 84 private ScheduledExecutorService electionRunner;
91 private ScheduledExecutorService lockExecutor; 85 private ScheduledExecutorService lockExecutor;
92 private ScheduledExecutorService staleLeadershipPurgeExecutor; 86 private ScheduledExecutorService staleLeadershipPurgeExecutor;
...@@ -104,7 +98,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -104,7 +98,7 @@ public class DistributedLeadershipManager implements LeadershipService {
104 private Set<String> activeTopics = Sets.newConcurrentHashSet(); 98 private Set<String> activeTopics = Sets.newConcurrentHashSet();
105 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap(); 99 private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
106 100
107 - // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS) 101 + // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
108 private static final int WAIT_BEFORE_RETRY_MILLIS = 150; 102 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
109 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; 103 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
110 private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2; 104 private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
...@@ -112,8 +106,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -112,8 +106,7 @@ public class DistributedLeadershipManager implements LeadershipService {
112 106
113 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); 107 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
114 108
115 - private static final Serializer SERIALIZER = Serializer.using( 109 + private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
116 - new KryoNamespace.Builder().register(KryoNamespaces.API).build());
117 110
118 @Activate 111 @Activate
119 public void activate() { 112 public void activate() {
...@@ -126,10 +119,38 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -126,10 +119,38 @@ public class DistributedLeadershipManager implements LeadershipService {
126 .withSerializer(SERIALIZER) 119 .withSerializer(SERIALIZER)
127 .withPartitionsDisabled().build(); 120 .withPartitionsDisabled().build();
128 121
122 + leaderMap.addListener(event -> {
123 + log.debug("Received {}", event);
124 + LeadershipEvent.Type leadershipEventType = null;
125 + if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
126 + leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
127 + } else if (event.type() == MapEvent.Type.REMOVE) {
128 + leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
129 + }
130 + onLeadershipEvent(new LeadershipEvent(
131 + leadershipEventType,
132 + new Leadership(event.key(),
133 + event.value().value(),
134 + event.value().version(),
135 + event.value().creationTime())));
136 + });
137 +
138 + candidateMap.addListener(event -> {
139 + log.debug("Received {}", event);
140 + if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
141 + log.error("Entries must not be removed from candidate map");
142 + return;
143 + }
144 + onLeadershipEvent(new LeadershipEvent(
145 + LeadershipEvent.Type.CANDIDATES_CHANGED,
146 + new Leadership(event.key(),
147 + event.value().value(),
148 + event.value().version(),
149 + event.value().creationTime())));
150 + });
151 +
129 localNodeId = clusterService.getLocalNode().id(); 152 localNodeId = clusterService.getLocalNode().id();
130 153
131 - messageHandlingExecutor = Executors.newSingleThreadExecutor(
132 - groupedThreads("onos/store/leadership", "message-handler"));
133 electionRunner = Executors.newSingleThreadScheduledExecutor( 154 electionRunner = Executors.newSingleThreadScheduledExecutor(
134 groupedThreads("onos/store/leadership", "election-runner")); 155 groupedThreads("onos/store/leadership", "election-runner"));
135 lockExecutor = Executors.newScheduledThreadPool( 156 lockExecutor = Executors.newScheduledThreadPool(
...@@ -138,11 +159,6 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -138,11 +159,6 @@ public class DistributedLeadershipManager implements LeadershipService {
138 groupedThreads("onos/store/leadership", "stale-leadership-evictor")); 159 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
139 leadershipRefresher = Executors.newSingleThreadScheduledExecutor( 160 leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
140 groupedThreads("onos/store/leadership", "refresh-thread")); 161 groupedThreads("onos/store/leadership", "refresh-thread"));
141 - clusterCommunicator.addSubscriber(
142 - LEADERSHIP_EVENT_MESSAGE_SUBJECT,
143 - SERIALIZER::decode,
144 - this::onLeadershipEvent,
145 - messageHandlingExecutor);
146 162
147 clusterService.addListener(clusterEventListener); 163 clusterService.addListener(clusterEventListener);
148 164
...@@ -168,10 +184,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -168,10 +184,8 @@ public class DistributedLeadershipManager implements LeadershipService {
168 184
169 clusterService.removeListener(clusterEventListener); 185 clusterService.removeListener(clusterEventListener);
170 eventDispatcher.removeSink(LeadershipEvent.class); 186 eventDispatcher.removeSink(LeadershipEvent.class);
171 - clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
172 187
173 electionRunner.shutdown(); 188 electionRunner.shutdown();
174 - messageHandlingExecutor.shutdown();
175 lockExecutor.shutdown(); 189 lockExecutor.shutdown();
176 staleLeadershipPurgeExecutor.shutdown(); 190 staleLeadershipPurgeExecutor.shutdown();
177 leadershipRefresher.shutdown(); 191 leadershipRefresher.shutdown();
...@@ -239,12 +253,6 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -239,12 +253,6 @@ public class DistributedLeadershipManager implements LeadershipService {
239 return newList; 253 return newList;
240 } 254 }
241 }); 255 });
242 - publish(new LeadershipEvent(
243 - LeadershipEvent.Type.CANDIDATES_CHANGED,
244 - new Leadership(path,
245 - candidates.value(),
246 - candidates.version(),
247 - candidates.creationTime())));
248 log.debug("In the leadership race for topic {} with candidates {}", path, candidates); 256 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
249 activeTopics.add(path); 257 activeTopics.add(path);
250 Leadership leadership = electLeader(path, candidates.value()); 258 Leadership leadership = electLeader(path, candidates.value());
...@@ -273,41 +281,14 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -273,41 +281,14 @@ public class DistributedLeadershipManager implements LeadershipService {
273 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path))); 281 future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
274 } 282 }
275 try { 283 try {
276 - Versioned<NodeId> leader = leaderMap.get(path); 284 + leaderMap.computeIf(path,
277 - if (leader != null && Objects.equals(leader.value(), localNodeId)) { 285 + localNodeId::equals,
278 - if (leaderMap.remove(path, leader.version())) { 286 + (topic, leader) -> null);
279 - log.debug("Gave up leadership for {}", path); 287 + candidateMap.computeIf(path,
280 - future.complete(null); 288 + candidates -> candidates != null && candidates.contains(localNodeId),
281 - publish(new LeadershipEvent( 289 + (topic, candidates) -> candidates.stream()
282 - LeadershipEvent.Type.LEADER_BOOTED, 290 + .filter(nodeId -> !localNodeId.equals(nodeId))
283 - new Leadership(path, 291 + .collect(Collectors.toList()));
284 - localNodeId,
285 - leader.version(),
286 - leader.creationTime())));
287 - }
288 - }
289 - // else we are not the current leader, can still be a candidate.
290 - Versioned<List<NodeId>> candidates = candidateMap.get(path);
291 - List<NodeId> candidateList = candidates != null
292 - ? Lists.newArrayList(candidates.value())
293 - : Lists.newArrayList();
294 - if (!candidateList.remove(localNodeId)) {
295 - future.complete(null);
296 - return;
297 - }
298 - if (candidateMap.replace(path, candidates.version(), candidateList)) {
299 - Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
300 - future.complete(null);
301 - publish(new LeadershipEvent(
302 - LeadershipEvent.Type.CANDIDATES_CHANGED,
303 - new Leadership(path,
304 - newCandidates.value(),
305 - newCandidates.version(),
306 - newCandidates.creationTime())));
307 - } else {
308 - log.debug("Failed to withdraw from candidates list for {}. Will retry", path);
309 - retryWithdraw(path, future);
310 - }
311 } catch (Exception e) { 292 } catch (Exception e) {
312 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e); 293 log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
313 retryWithdraw(path, future); 294 retryWithdraw(path, future);
...@@ -321,19 +302,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -321,19 +302,9 @@ public class DistributedLeadershipManager implements LeadershipService {
321 } 302 }
322 303
323 try { 304 try {
324 - Versioned<NodeId> leader = leaderMap.get(path); 305 + return leaderMap.computeIf(path,
325 - if (leader != null && Objects.equals(leader.value(), localNodeId)) { 306 + localNodeId::equals,
326 - if (leaderMap.remove(path, leader.version())) { 307 + (topic, leader) -> null) == null;
327 - log.debug("Stepped down from leadership for {}", path);
328 - publish(new LeadershipEvent(
329 - LeadershipEvent.Type.LEADER_BOOTED,
330 - new Leadership(path,
331 - localNodeId,
332 - leader.version(),
333 - leader.creationTime())));
334 - return true;
335 - }
336 - }
337 } catch (Exception e) { 308 } catch (Exception e) {
338 log.warn("Error executing stepdown for {}", path, e); 309 log.warn("Error executing stepdown for {}", path, e);
339 } 310 }
...@@ -352,7 +323,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -352,7 +323,7 @@ public class DistributedLeadershipManager implements LeadershipService {
352 323
353 @Override 324 @Override
354 public boolean makeTopCandidate(String path, NodeId nodeId) { 325 public boolean makeTopCandidate(String path, NodeId nodeId) {
355 - Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path, 326 + Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
356 candidates -> candidates != null && 327 candidates -> candidates != null &&
357 candidates.contains(nodeId) && 328 candidates.contains(nodeId) &&
358 !nodeId.equals(Iterables.getFirst(candidates, null)), 329 !nodeId.equals(Iterables.getFirst(candidates, null)),
...@@ -362,13 +333,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -362,13 +333,8 @@ public class DistributedLeadershipManager implements LeadershipService {
362 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add); 333 candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
363 return updatedCandidates; 334 return updatedCandidates;
364 }); 335 });
365 - publish(new LeadershipEvent( 336 + List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
366 - LeadershipEvent.Type.CANDIDATES_CHANGED, 337 + return candidates.size() > 0 && nodeId.equals(candidates.get(0));
367 - new Leadership(path,
368 - newCandidates.value(),
369 - newCandidates.version(),
370 - newCandidates.creationTime())));
371 - return true;
372 } 338 }
373 339
374 private Leadership electLeader(String path, List<NodeId> candidates) { 340 private Leadership electLeader(String path, List<NodeId> candidates) {
...@@ -389,9 +355,6 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -389,9 +355,6 @@ public class DistributedLeadershipManager implements LeadershipService {
389 leader.value(), 355 leader.value(),
390 leader.version(), 356 leader.version(),
391 leader.creationTime()); 357 leader.creationTime());
392 - publish(new LeadershipEvent(
393 - LeadershipEvent.Type.LEADER_ELECTED,
394 - newLeadership));
395 return newLeadership; 358 return newLeadership;
396 } 359 }
397 } catch (Exception e) { 360 } catch (Exception e) {
...@@ -432,11 +395,6 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -432,11 +395,6 @@ public class DistributedLeadershipManager implements LeadershipService {
432 } 395 }
433 } 396 }
434 397
435 - private void publish(LeadershipEvent event) {
436 - onLeadershipEvent(event);
437 - clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
438 - }
439 -
440 private void onLeadershipEvent(LeadershipEvent leadershipEvent) { 398 private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
441 log.trace("Leadership Event: time = {} type = {} event = {}", 399 log.trace("Leadership Event: time = {} type = {} event = {}",
442 leadershipEvent.time(), leadershipEvent.type(), 400 leadershipEvent.time(), leadershipEvent.type(),
...@@ -517,15 +475,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -517,15 +475,8 @@ public class DistributedLeadershipManager implements LeadershipService {
517 .forEach(entry -> { 475 .forEach(entry -> {
518 String path = entry.getKey(); 476 String path = entry.getKey();
519 NodeId nodeId = entry.getValue().value(); 477 NodeId nodeId = entry.getValue().value();
520 - long epoch = entry.getValue().version();
521 - long creationTime = entry.getValue().creationTime();
522 try { 478 try {
523 - if (leaderMap.remove(path, epoch)) { 479 + leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
524 - log.debug("Purged stale lock held by {} for {}", nodeId, path);
525 - publish(new LeadershipEvent(
526 - LeadershipEvent.Type.LEADER_BOOTED,
527 - new Leadership(path, nodeId, epoch, creationTime)));
528 - }
529 } catch (Exception e) { 480 } catch (Exception e) {
530 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e); 481 log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
531 rerunPurge.set(true); 482 rerunPurge.set(true);
...@@ -548,21 +499,15 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -548,21 +499,15 @@ public class DistributedLeadershipManager implements LeadershipService {
548 Sets.difference(Sets.newHashSet(candidatesList), 499 Sets.difference(Sets.newHashSet(candidatesList),
549 Sets.newHashSet(activeCandidatesList)); 500 Sets.newHashSet(activeCandidatesList));
550 try { 501 try {
551 - if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) { 502 + candidateMap.computeIf(path,
552 - log.info("Evicted inactive candidates {} from " 503 + c -> c.stream()
553 - + "candidate list for {}", removedCandidates, path); 504 + .filter(n -> clusterService.getState(n) == INACTIVE)
554 - Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path); 505 + .count() > 0,
555 - publish(new LeadershipEvent( 506 + (topic, c) -> c.stream()
556 - LeadershipEvent.Type.CANDIDATES_CHANGED, 507 + .filter(n -> clusterService.getState(n) == ACTIVE)
557 - new Leadership(path, 508 + .filter(n -> !localNodeId.equals(n) ||
558 - updatedCandidates.value(), 509 + activeTopics.contains(path))
559 - updatedCandidates.version(), 510 + .collect(Collectors.toList()));
560 - updatedCandidates.creationTime())));
561 - } else {
562 - // Conflicting update detected. Rerun purge to make sure
563 - // inactive candidates are evicted.
564 - rerunPurge.set(true);
565 - }
566 } catch (Exception e) { 511 } catch (Exception e) {
567 log.debug("Failed to evict inactive candidates {} from " 512 log.debug("Failed to evict inactive candidates {} from "
568 + "candidate list for {}", removedCandidates, path, e); 513 + "candidate list for {}", removedCandidates, path, e);
......