Madan Jampani
Committed by Pavlin Radoslavov

Initial cut at Leadership Manager

Change-Id: I658c6fca3dc6f686e0f7facc9e7b443679ebae1e

Change-Id: I293906add41ff4310e3584847d806345e0312703

Change-Id: I7fb13a72ba4aef10d7c2262b96e0df64efecfcef
1 +package org.onlab.onos.cluster;
2 +
3 +import java.util.Objects;
4 +
5 +import com.google.common.base.MoreObjects;
6 +
7 +/**
8 + * Abstract leadership concept.
9 + */
10 +public class Leadership {
11 +
12 + private final String topic;
13 + private final ControllerNode leader;
14 + private final long term;
15 +
16 + public Leadership(String topic, ControllerNode leader, long term) {
17 + this.topic = topic;
18 + this.leader = leader;
19 + this.term = term;
20 + }
21 +
22 + /**
23 + * The topic for which this leadership applies.
24 + * @return leadership topic.
25 + */
26 + public String topic() {
27 + return topic;
28 + }
29 +
30 + /**
31 + * The leader for this topic.
32 + * @return leader node.
33 + */
34 + public ControllerNode leader() {
35 + return leader;
36 + }
37 +
38 + /**
39 + * The term number associated with this leadership.
40 + * @return leadership term
41 + */
42 + public long term() {
43 + return term;
44 + }
45 +
46 + @Override
47 + public int hashCode() {
48 + return Objects.hash(topic, leader, term);
49 + }
50 +
51 + @Override
52 + public String toString() {
53 + return MoreObjects.toStringHelper(this.getClass())
54 + .add("topic", topic)
55 + .add("leader", leader)
56 + .add("term", term)
57 + .toString();
58 + }
59 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -15,44 +15,73 @@ ...@@ -15,44 +15,73 @@
15 */ 15 */
16 package org.onlab.onos.cluster; 16 package org.onlab.onos.cluster;
17 17
18 +import java.util.Objects;
19 +
18 import org.onlab.onos.event.AbstractEvent; 20 import org.onlab.onos.event.AbstractEvent;
19 21
22 +import com.google.common.base.MoreObjects;
23 +
20 /** 24 /**
21 * Describes leadership-related event. 25 * Describes leadership-related event.
22 */ 26 */
23 -public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, ControllerNode> { 27 +public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leadership> {
24 28
25 /** 29 /**
26 * Type of leadership-related events. 30 * Type of leadership-related events.
27 */ 31 */
28 public enum Type { 32 public enum Type {
29 /** 33 /**
30 - * Signifies that the leader has changed. The event subject is the 34 + * Signifies that the leader has been elected. The event subject is the
31 * new leader. 35 * new leader.
32 */ 36 */
33 - LEADER_CHANGED 37 + LEADER_ELECTED,
38 +
39 + /**
40 + * Signifies that the leader has been re-elected. The event subject is the
41 + * leader.
42 + */
43 + LEADER_REELECTED,
44 +
45 + /**
46 + * Signifies that the leader has been booted and lost leadership. The event subject is the
47 + * former leader.
48 + */
49 + LEADER_BOOTED
34 } 50 }
35 51
36 /** 52 /**
37 * Creates an event of a given type and for the specified instance and the 53 * Creates an event of a given type and for the specified instance and the
38 * current time. 54 * current time.
39 * 55 *
40 - * @param type leadership event type 56 + * @param type leadership event type
41 - * @param instance cluster device subject 57 + * @param leadership event subject
42 */ 58 */
43 - public LeadershipEvent(Type type, ControllerNode instance) { 59 + public LeadershipEvent(Type type, Leadership leadership) {
44 - super(type, instance); 60 + super(type, leadership);
45 } 61 }
46 62
47 /** 63 /**
48 - * Creates an event of a given type and for the specified device and time. 64 + * Creates an event of a given type and for the specified subject and time.
49 * 65 *
50 - * @param type device event type 66 + * @param type leadership event type
51 - * @param instance event device subject 67 + * @param leadership event subject
52 * @param time occurrence time 68 * @param time occurrence time
53 */ 69 */
54 - public LeadershipEvent(Type type, ControllerNode instance, long time) { 70 + public LeadershipEvent(Type type, Leadership leadership, long time) {
55 - super(type, instance, time); 71 + super(type, leadership, time);
56 } 72 }
57 73
74 + @Override
75 + public int hashCode() {
76 + return Objects.hash(type(), subject(), time());
77 + }
78 +
79 + @Override
80 + public String toString() {
81 + return MoreObjects.toStringHelper(this.getClass())
82 + .add("type", type())
83 + .add("subject", subject())
84 + .add("time", time())
85 + .toString();
86 + }
58 } 87 }
......
...@@ -21,4 +21,4 @@ import org.onlab.onos.event.EventListener; ...@@ -21,4 +21,4 @@ import org.onlab.onos.event.EventListener;
21 * Entity capable of receiving device leadership-related events. 21 * Entity capable of receiving device leadership-related events.
22 */ 22 */
23 public interface LeadershipEventListener extends EventListener<LeadershipEvent> { 23 public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
24 -} 24 +}
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -16,29 +16,35 @@ ...@@ -16,29 +16,35 @@
16 package org.onlab.onos.cluster; 16 package org.onlab.onos.cluster;
17 17
18 /** 18 /**
19 - * Service for obtaining information about the leader election. 19 + * Service for leader election.
20 + * Leadership contents are organized around topics. ONOS instance can join the
21 + * leadership race for a topic or withdraw from a race it has previously joined
22 + * Once in the race, the instance can get asynchronously notified
23 + * of leadership election results.
20 */ 24 */
21 public interface LeadershipService { 25 public interface LeadershipService {
22 26
23 /** 27 /**
24 - * Returns the current leader controller node. 28 + * Joins the leadership contest.
25 - * 29 + * @param path topic for which this controller node wishes to be a leader.
26 - * @return current leader controller node
27 */ 30 */
28 - ControllerNode getLeader(); 31 + void runForLeadership(String path);
29 32
30 /** 33 /**
31 - * Adds the specified leadership event listener. 34 + * Withdraws from a leadership contest.
32 - * 35 + * @param path topic for which this controller node no longer wishes to be a leader.
33 - * @param listener the leadership listener 36 + */
37 + void withdraw(String path);
38 +
39 + /**
40 + * Registers a event listener to be notified of leadership events.
41 + * @param listener listener that will asynchronously notified of leadership events.
34 */ 42 */
35 void addListener(LeadershipEventListener listener); 43 void addListener(LeadershipEventListener listener);
36 44
37 /** 45 /**
38 - * Removes the specified leadership event listener. 46 + * Unregisters a event listener for leadership events.
39 - * 47 + * @param listener listener to be removed.
40 - * @param listener the leadership listener
41 */ 48 */
42 void removeListener(LeadershipEventListener listener); 49 void removeListener(LeadershipEventListener listener);
43 - 50 +}
44 -}
...\ No newline at end of file ...\ No newline at end of file
......
1 package org.onlab.onos.store.service; 1 package org.onlab.onos.store.service;
2 2
3 -import java.util.concurrent.Future; 3 +import java.util.concurrent.CompletableFuture;
4 4
5 /** 5 /**
6 * A lock is a tool for controlling access to a shared resource by multiple processes. 6 * A lock is a tool for controlling access to a shared resource by multiple processes.
...@@ -43,7 +43,7 @@ public interface Lock { ...@@ -43,7 +43,7 @@ public interface Lock {
43 * will be reserved before it becomes available for others. 43 * will be reserved before it becomes available for others.
44 * @return Future that can be used for blocking until lock is acquired. 44 * @return Future that can be used for blocking until lock is acquired.
45 */ 45 */
46 - Future<Void> lockAsync(int leaseDurationMillis); 46 + CompletableFuture<Void> lockAsync(int leaseDurationMillis);
47 47
48 /** 48 /**
49 * Acquires the lock only if it is free at the time of invocation. 49 * Acquires the lock only if it is free at the time of invocation.
......
1 +package org.onlab.onos.store.cluster.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkArgument;
4 +import static com.google.common.base.Verify.verifyNotNull;
5 +import static org.onlab.util.Tools.namedThreads;
6 +import static org.slf4j.LoggerFactory.getLogger;
7 +
8 +import java.util.Map;
9 +import java.util.Set;
10 +import java.util.concurrent.Executors;
11 +import java.util.concurrent.ScheduledExecutorService;
12 +import java.util.concurrent.TimeUnit;
13 +
14 +import org.apache.felix.scr.annotations.Activate;
15 +import org.apache.felix.scr.annotations.Component;
16 +import org.apache.felix.scr.annotations.Deactivate;
17 +import org.apache.felix.scr.annotations.Reference;
18 +import org.apache.felix.scr.annotations.ReferenceCardinality;
19 +import org.apache.felix.scr.annotations.Service;
20 +import org.onlab.onos.cluster.ClusterService;
21 +import org.onlab.onos.cluster.ControllerNode;
22 +import org.onlab.onos.cluster.Leadership;
23 +import org.onlab.onos.cluster.LeadershipEvent;
24 +import org.onlab.onos.cluster.LeadershipEventListener;
25 +import org.onlab.onos.cluster.LeadershipService;
26 +import org.onlab.onos.store.service.Lock;
27 +import org.onlab.onos.store.service.LockService;
28 +import org.onlab.onos.store.service.impl.DistributedLockManager;
29 +import org.slf4j.Logger;
30 +
31 +import com.google.common.collect.Maps;
32 +import com.google.common.collect.Sets;
33 +
34 +/**
35 + * Distributed implementation of LeadershipService that is based on the primitives exposed by
36 + * LockService.
37 + */
38 +@Component(immediate = true)
39 +@Service
40 +public class LeadershipManager implements LeadershipService {
41 +
42 + private final Logger log = getLogger(getClass());
43 +
44 + // TODO: Remove this dependency
45 + private static final int TERM_DURATION_MS =
46 + DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
47 +
48 + // TODO: Appropriate Thread pool sizing.
49 + private static final ScheduledExecutorService THREAD_POOL =
50 + Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
51 +
52 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 + private ClusterService clusterService;
54 +
55 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 + private LockService lockService;
57 +
58 + private Map<String, Lock> openContests = Maps.newHashMap();
59 + private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
60 + private ControllerNode localNode;
61 +
62 + @Activate
63 + public void activate() {
64 + localNode = clusterService.getLocalNode();
65 + log.info("Started.");
66 + }
67 +
68 + @Deactivate
69 + public void deactivate() {
70 + THREAD_POOL.shutdown();
71 + log.info("Stopped.");
72 + }
73 +
74 + @Override
75 + public void runForLeadership(String path) {
76 + checkArgument(path != null);
77 + if (openContests.containsKey(path)) {
78 + log.info("Already in the leadership contest for {}", path);
79 + return;
80 + } else {
81 + Lock lock = lockService.create(path);
82 + openContests.put(path, lock);
83 + tryAcquireLeadership(path);
84 + }
85 + }
86 +
87 + @Override
88 + public void withdraw(String path) {
89 + checkArgument(path != null);
90 + Lock lock = openContests.remove(path);
91 +
92 + if (lock != null && lock.isLocked()) {
93 + lock.unlock();
94 + notifyListeners(
95 + new LeadershipEvent(
96 + LeadershipEvent.Type.LEADER_BOOTED,
97 + new Leadership(lock.path(), localNode, 0)));
98 + // FIXME: Should set the correct term information.
99 + }
100 + }
101 +
102 + @Override
103 + public void addListener(LeadershipEventListener listener) {
104 + checkArgument(listener != null);
105 + listeners.add(listener);
106 + }
107 +
108 + @Override
109 + public void removeListener(LeadershipEventListener listener) {
110 + checkArgument(listener != null);
111 + listeners.remove(listener);
112 + }
113 +
114 + private void notifyListeners(LeadershipEvent event) {
115 + for (LeadershipEventListener listener : listeners) {
116 + listener.event(event);
117 + }
118 + }
119 +
120 + private void tryAcquireLeadership(String path) {
121 + Lock lock = openContests.get(path);
122 + verifyNotNull(lock, "Lock should not be null");
123 + lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
124 + if (error == null) {
125 + THREAD_POOL.schedule(
126 + new RelectionTask(lock),
127 + TERM_DURATION_MS / 2,
128 + TimeUnit.MILLISECONDS);
129 + notifyListeners(
130 + new LeadershipEvent(
131 + LeadershipEvent.Type.LEADER_ELECTED,
132 + new Leadership(lock.path(), localNode, 0)));
133 + } else {
134 + log.error("Failed to acquire lock for {}", path, error);
135 + // retry
136 + tryAcquireLeadership(path);
137 + }
138 + });
139 + }
140 +
141 + private class RelectionTask implements Runnable {
142 +
143 + private final Lock lock;
144 +
145 + public RelectionTask(Lock lock) {
146 + this.lock = lock;
147 + }
148 +
149 + @Override
150 + public void run() {
151 + if (lock.extendExpiration(TERM_DURATION_MS)) {
152 + notifyListeners(
153 + new LeadershipEvent(
154 + LeadershipEvent.Type.LEADER_REELECTED,
155 + new Leadership(lock.path(), localNode, 0)));
156 + THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
157 + } else {
158 + if (openContests.containsKey(lock.path())) {
159 + notifyListeners(
160 + new LeadershipEvent(
161 + LeadershipEvent.Type.LEADER_BOOTED,
162 + new Leadership(lock.path(), localNode, 0)));
163 + tryAcquireLeadership(lock.path());
164 + }
165 + }
166 + }
167 + }
168 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -6,7 +6,6 @@ import java.nio.charset.StandardCharsets; ...@@ -6,7 +6,6 @@ import java.nio.charset.StandardCharsets;
6 import java.util.UUID; 6 import java.util.UUID;
7 import java.util.concurrent.CompletableFuture; 7 import java.util.concurrent.CompletableFuture;
8 import java.util.concurrent.ExecutionException; 8 import java.util.concurrent.ExecutionException;
9 -import java.util.concurrent.Future;
10 import java.util.concurrent.TimeUnit; 9 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.TimeoutException; 10 import java.util.concurrent.TimeoutException;
12 import java.util.concurrent.atomic.AtomicBoolean; 11 import java.util.concurrent.atomic.AtomicBoolean;
...@@ -62,7 +61,7 @@ public class DistributedLock implements Lock { ...@@ -62,7 +61,7 @@ public class DistributedLock implements Lock {
62 } 61 }
63 62
64 @Override 63 @Override
65 - public Future<Void> lockAsync(int leaseDurationMillis) { 64 + public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
66 if (isLocked() || tryLock(leaseDurationMillis)) { 65 if (isLocked() || tryLock(leaseDurationMillis)) {
67 return CompletableFuture.<Void>completedFuture(null); 66 return CompletableFuture.<Void>completedFuture(null);
68 } 67 }
......