Pavlin Radoslavov

Added Hazelcast-based Leadership implementation that is needed/used by SDN-IP.

This is a drop-in replacement until the generic ONOS Leadership service
is robust enough.

Change-Id: I72a84331dd948f98707eb59844dab425aa9d5c08
...@@ -85,6 +85,12 @@ ...@@ -85,6 +85,12 @@
85 </dependency> 85 </dependency>
86 86
87 <dependency> 87 <dependency>
88 + <groupId>org.onlab.onos</groupId>
89 + <artifactId>onos-core-dist</artifactId>
90 + <version>${project.version}</version>
91 + </dependency>
92 +
93 + <dependency>
88 <groupId>org.apache.karaf.shell</groupId> 94 <groupId>org.apache.karaf.shell</groupId>
89 <artifactId>org.apache.karaf.shell.console</artifactId> 95 <artifactId>org.apache.karaf.shell.console</artifactId>
90 </dependency> 96 </dependency>
......
1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<!--
3 + ~ Copyright 2014 Open Networking Laboratory
4 + ~
5 + ~ Licensed under the Apache License, Version 2.0 (the "License");
6 + ~ you may not use this file except in compliance with the License.
7 + ~ You may obtain a copy of the License at
8 + ~
9 + ~ http://www.apache.org/licenses/LICENSE-2.0
10 + ~
11 + ~ Unless required by applicable law or agreed to in writing, software
12 + ~ distributed under the License is distributed on an "AS IS" BASIS,
13 + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 + ~ See the License for the specific language governing permissions and
15 + ~ limitations under the License.
16 + -->
17 +<project xmlns="http://maven.apache.org/POM/4.0.0"
18 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
20 + <modelVersion>4.0.0</modelVersion>
21 +
22 + <parent>
23 + <groupId>org.onlab.onos</groupId>
24 + <artifactId>onos-apps</artifactId>
25 + <version>1.0.0-SNAPSHOT</version>
26 + <relativePath>../pom.xml</relativePath>
27 + </parent>
28 +
29 + <artifactId>onos-app-sdnip</artifactId>
30 + <packaging>bundle</packaging>
31 +
32 + <description>SDN-IP peering application</description>
33 +
34 + <dependencies>
35 + <dependency>
36 + <groupId>com.fasterxml.jackson.core</groupId>
37 + <artifactId>jackson-databind</artifactId>
38 + </dependency>
39 + <dependency>
40 + <groupId>com.fasterxml.jackson.core</groupId>
41 + <artifactId>jackson-annotations</artifactId>
42 + <version>2.4.2</version>
43 + <scope>provided</scope>
44 + </dependency>
45 +
46 + <dependency>
47 + <groupId>com.google.guava</groupId>
48 + <artifactId>guava</artifactId>
49 + </dependency>
50 +
51 + <dependency>
52 + <groupId>org.apache.commons</groupId>
53 + <artifactId>commons-collections4</artifactId>
54 + <version>4.0</version>
55 + </dependency>
56 +
57 + <dependency>
58 + <groupId>org.onlab.onos</groupId>
59 + <artifactId>onlab-thirdparty</artifactId>
60 + </dependency>
61 +
62 + <dependency>
63 + <groupId>org.onlab.onos</groupId>
64 + <artifactId>onlab-misc</artifactId>
65 + </dependency>
66 +
67 + <dependency>
68 + <groupId>org.onlab.onos</groupId>
69 + <artifactId>onlab-junit</artifactId>
70 + <scope>test</scope>
71 + </dependency>
72 +
73 + <dependency>
74 + <groupId>org.onlab.onos</groupId>
75 + <artifactId>onos-api</artifactId>
76 + <version>${project.version}</version>
77 + <scope>test</scope>
78 + <classifier>tests</classifier>
79 + </dependency>
80 +
81 + <dependency>
82 + <groupId>org.onlab.onos</groupId>
83 + <artifactId>onos-cli</artifactId>
84 + <version>${project.version}</version>
85 + </dependency>
86 +
87 + <dependency>
88 + <groupId>org.apache.karaf.shell</groupId>
89 + <artifactId>org.apache.karaf.shell.console</artifactId>
90 + </dependency>
91 +
92 + <dependency>
93 + <groupId>org.osgi</groupId>
94 + <artifactId>org.osgi.core</artifactId>
95 + </dependency>
96 +
97 + <dependency>
98 + <groupId>org.easymock</groupId>
99 + <artifactId>easymock</artifactId>
100 + <scope>test</scope>
101 + </dependency>
102 + </dependencies>
103 +
104 +</project>
...@@ -29,15 +29,17 @@ import org.onlab.onos.cluster.ClusterService; ...@@ -29,15 +29,17 @@ import org.onlab.onos.cluster.ClusterService;
29 import org.onlab.onos.cluster.ControllerNode; 29 import org.onlab.onos.cluster.ControllerNode;
30 import org.onlab.onos.cluster.LeadershipEvent; 30 import org.onlab.onos.cluster.LeadershipEvent;
31 import org.onlab.onos.cluster.LeadershipEventListener; 31 import org.onlab.onos.cluster.LeadershipEventListener;
32 -import org.onlab.onos.cluster.LeadershipService; 32 +// import org.onlab.onos.cluster.LeadershipService;
33 import org.onlab.onos.core.ApplicationId; 33 import org.onlab.onos.core.ApplicationId;
34 import org.onlab.onos.core.CoreService; 34 import org.onlab.onos.core.CoreService;
35 +import org.onlab.onos.event.EventDeliveryService;
35 import org.onlab.onos.net.host.HostService; 36 import org.onlab.onos.net.host.HostService;
36 import org.onlab.onos.net.intent.IntentService; 37 import org.onlab.onos.net.intent.IntentService;
37 import org.onlab.onos.sdnip.bgp.BgpRouteEntry; 38 import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
38 import org.onlab.onos.sdnip.bgp.BgpSession; 39 import org.onlab.onos.sdnip.bgp.BgpSession;
39 import org.onlab.onos.sdnip.bgp.BgpSessionManager; 40 import org.onlab.onos.sdnip.bgp.BgpSessionManager;
40 import org.onlab.onos.sdnip.config.SdnIpConfigReader; 41 import org.onlab.onos.sdnip.config.SdnIpConfigReader;
42 +import org.onlab.onos.store.hz.StoreService;
41 43
42 import org.slf4j.Logger; 44 import org.slf4j.Logger;
43 45
...@@ -68,7 +70,13 @@ public class SdnIp implements SdnIpService { ...@@ -68,7 +70,13 @@ public class SdnIp implements SdnIpService {
68 protected ClusterService clusterService; 70 protected ClusterService clusterService;
69 71
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 - protected LeadershipService leadershipService; 73 + protected StoreService storeService;
74 +
75 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 + protected EventDeliveryService eventDispatcher;
77 +
78 + // @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 + protected SdnIpLeadershipService leadershipService;
72 80
73 private IntentSynchronizer intentSynchronizer; 81 private IntentSynchronizer intentSynchronizer;
74 private SdnIpConfigReader config; 82 private SdnIpConfigReader config;
...@@ -77,7 +85,7 @@ public class SdnIp implements SdnIpService { ...@@ -77,7 +85,7 @@ public class SdnIp implements SdnIpService {
77 private BgpSessionManager bgpSessionManager; 85 private BgpSessionManager bgpSessionManager;
78 private LeadershipEventListener leadershipEventListener = 86 private LeadershipEventListener leadershipEventListener =
79 new InnerLeadershipEventListener(); 87 new InnerLeadershipEventListener();
80 - ApplicationId appId; 88 + private ApplicationId appId;
81 private ControllerNode localControllerNode; 89 private ControllerNode localControllerNode;
82 90
83 @Activate 91 @Activate
...@@ -106,6 +114,10 @@ public class SdnIp implements SdnIpService { ...@@ -106,6 +114,10 @@ public class SdnIp implements SdnIpService {
106 interfaceService, hostService); 114 interfaceService, hostService);
107 router.start(); 115 router.start();
108 116
117 + leadershipService = new SdnIpLeadershipService(clusterService,
118 + storeService,
119 + eventDispatcher);
120 + leadershipService.start();
109 leadershipService.addListener(leadershipEventListener); 121 leadershipService.addListener(leadershipEventListener);
110 leadershipService.runForLeadership(appId.name()); 122 leadershipService.runForLeadership(appId.name());
111 123
...@@ -126,6 +138,7 @@ public class SdnIp implements SdnIpService { ...@@ -126,6 +138,7 @@ public class SdnIp implements SdnIpService {
126 138
127 leadershipService.withdraw(appId.name()); 139 leadershipService.withdraw(appId.name());
128 leadershipService.removeListener(leadershipEventListener); 140 leadershipService.removeListener(leadershipEventListener);
141 + leadershipService.stop();
129 142
130 log.info("SDN-IP Stopped"); 143 log.info("SDN-IP Stopped");
131 } 144 }
......
1 +/*
2 + * Copyright 2014 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.onos.sdnip;
17 +
18 +import java.util.Map;
19 +import java.util.concurrent.ExecutorService;
20 +import java.util.concurrent.Executors;
21 +import java.util.concurrent.Future;
22 +import java.util.concurrent.locks.Lock;
23 +
24 +import org.onlab.onos.cluster.ClusterService;
25 +import org.onlab.onos.cluster.ControllerNode;
26 +import org.onlab.onos.cluster.Leadership;
27 +import org.onlab.onos.cluster.LeadershipEvent;
28 +import org.onlab.onos.cluster.LeadershipEventListener;
29 +import org.onlab.onos.cluster.LeadershipService;
30 +import org.onlab.onos.cluster.NodeId;
31 +import org.onlab.onos.event.AbstractListenerRegistry;
32 +import org.onlab.onos.event.EventDeliveryService;
33 +import org.onlab.onos.store.hz.StoreService;
34 +import org.onlab.onos.store.serializers.KryoNamespaces;
35 +import org.onlab.onos.store.serializers.KryoSerializer;
36 +import org.onlab.util.KryoNamespace;
37 +import static org.onlab.util.Tools.namedThreads;
38 +
39 +import org.slf4j.Logger;
40 +import org.slf4j.LoggerFactory;
41 +import com.google.common.collect.Maps;
42 +import com.hazelcast.config.TopicConfig;
43 +import com.hazelcast.core.ITopic;
44 +import com.hazelcast.core.Message;
45 +import com.hazelcast.core.MessageListener;
46 +
47 +import static com.google.common.base.Preconditions.checkArgument;
48 +
49 +/**
50 + * Distributed implementation of LeadershipService that is based on Hazelcast.
51 + * <p>
52 + * The election is eventually-consistent: if there is Hazelcast partitioning,
53 + * and the partitioning is healed, there could be a short window of time
54 + * until the leaders in each partition discover each other. If this happens,
55 + * the leaders release the leadership and run again for election.
56 + * </p>
57 + * <p>
58 + * The leader election is based on Hazelcast's Global Lock, which is stongly
59 + * consistent. In addition, each leader periodically advertises events
60 + * (using a Hazelcast Topic) that it is the elected leader. Those events are
61 + * used for two purposes: (1) Discover multi-leader collisions (in case of
62 + * healed Hazelcast partitions), and (2) Inform all listeners who is
63 + * the current leader (e.g., for informational purpose).
64 + * </p>
65 + */
66 +public class SdnIpLeadershipService implements LeadershipService {
67 + private static final Logger log =
68 + LoggerFactory.getLogger(SdnIpLeadershipService.class);
69 +
70 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
71 + @Override
72 + protected void setupKryoPool() {
73 + serializerPool = KryoNamespace.newBuilder()
74 + .register(KryoNamespaces.API)
75 + .build()
76 + .populate(1);
77 + }
78 + };
79 +
80 + private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
81 + private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
82 +
83 + private ClusterService clusterService;
84 + private StoreService storeService;
85 + private EventDeliveryService eventDispatcher;
86 +
87 + private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
88 + listenerRegistry;
89 + private final Map<String, Topic> topics = Maps.newConcurrentMap();
90 + private ControllerNode localNode;
91 +
92 + /**
93 + * Constructor.
94 + *
95 + * @param clusterService the cluster service to use
96 + * @param storeService the store service to use
97 + * @param eventDispatcher the event dispacher to use
98 + */
99 + SdnIpLeadershipService(ClusterService clusterService,
100 + StoreService storeService,
101 + EventDeliveryService eventDispatcher) {
102 + this.clusterService = clusterService;
103 + this.storeService = storeService;
104 + this.eventDispatcher = eventDispatcher;
105 + }
106 +
107 + /**
108 + * Starts operation.
109 + */
110 + void start() {
111 + localNode = clusterService.getLocalNode();
112 + listenerRegistry = new AbstractListenerRegistry<>();
113 + eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
114 +
115 + log.info("SDN-IP Leadership Service started");
116 + }
117 +
118 + /**
119 + * Stops operation.
120 + */
121 + void stop() {
122 + eventDispatcher.removeSink(LeadershipEvent.class);
123 +
124 + for (Topic topic : topics.values()) {
125 + topic.stop();
126 + }
127 + topics.clear();
128 +
129 + log.info("SDN-IP Leadership Service stopped");
130 + }
131 +
132 + @Override
133 + public ControllerNode getLeader(String path) {
134 + Topic topic = topics.get(path);
135 + if (topic == null) {
136 + return null;
137 + }
138 + return topic.leader();
139 + }
140 +
141 + @Override
142 + public void runForLeadership(String path) {
143 + checkArgument(path != null);
144 + Topic topic = new Topic(path);
145 + Topic oldTopic = topics.putIfAbsent(path, topic);
146 + if (oldTopic == null) {
147 + topic.start();
148 + }
149 + }
150 +
151 + @Override
152 + public void withdraw(String path) {
153 + checkArgument(path != null);
154 + Topic topic = topics.get(path);
155 + if (topic != null) {
156 + topic.stop();
157 + topics.remove(path, topic);
158 + }
159 + }
160 +
161 + @Override
162 + public void addListener(LeadershipEventListener listener) {
163 + listenerRegistry.addListener(listener);
164 + }
165 +
166 + @Override
167 + public void removeListener(LeadershipEventListener listener) {
168 + listenerRegistry.removeListener(listener);
169 + }
170 +
171 + /**
172 + * Class for keeping per-topic information.
173 + */
174 + private final class Topic implements MessageListener<byte[]> {
175 + private final String topicName;
176 + private volatile boolean isShutdown = true;
177 + private volatile long lastLeadershipUpdateMs = 0;
178 + private ExecutorService leaderElectionExecutor;
179 +
180 + private ControllerNode leader;
181 + private Lock leaderLock;
182 + private Future<?> getLockFuture;
183 + private Future<?> periodicProcessingFuture;
184 + private ITopic<byte[]> leaderTopic;
185 + private String leaderTopicRegistrationId;
186 +
187 + /**
188 + * Constructor.
189 + *
190 + * @param topicName the topic name
191 + */
192 + private Topic(String topicName) {
193 + this.topicName = topicName;
194 + }
195 +
196 + /**
197 + * Gets the leader for the topic.
198 + *
199 + * @return the leader for the topic
200 + */
201 + private ControllerNode leader() {
202 + return leader;
203 + }
204 +
205 + /**
206 + * Starts leadership election for the topic.
207 + */
208 + private void start() {
209 + isShutdown = false;
210 + String lockHzId = "LeadershipService/" + topicName + "/lock";
211 + String topicHzId = "LeadershipService/" + topicName + "/topic";
212 + leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
213 +
214 + String threadPoolName =
215 + "sdnip-leader-election-" + topicName + "-%d";
216 + leaderElectionExecutor = Executors.newScheduledThreadPool(2,
217 + namedThreads(threadPoolName));
218 +
219 + TopicConfig topicConfig = new TopicConfig();
220 + topicConfig.setGlobalOrderingEnabled(true);
221 + topicConfig.setName(topicHzId);
222 + storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
223 +
224 + leaderTopic =
225 + storeService.getHazelcastInstance().getTopic(topicHzId);
226 + leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
227 +
228 + getLockFuture = leaderElectionExecutor.submit(new Runnable() {
229 + @Override
230 + public void run() {
231 + doLeaderElectionThread();
232 + }
233 + });
234 + periodicProcessingFuture =
235 + leaderElectionExecutor.submit(new Runnable() {
236 + @Override
237 + public void run() {
238 + doPeriodicProcessing();
239 + }
240 + });
241 + }
242 +
243 + /**
244 + * Stops leadership election for the topic.
245 + */
246 + private void stop() {
247 + isShutdown = true;
248 + leaderTopic.removeMessageListener(leaderTopicRegistrationId);
249 + // getLockFuture.cancel(true);
250 + // periodicProcessingFuture.cancel(true);
251 + leaderElectionExecutor.shutdownNow();
252 + }
253 +
254 + @Override
255 + public void onMessage(Message<byte[]> message) {
256 + LeadershipEvent leadershipEvent =
257 + SERIALIZER.decode(message.getMessageObject());
258 + NodeId eventLeaderId = leadershipEvent.subject().leader().id();
259 +
260 + log.debug("SDN-IP Leadership Event: time = {} type = {} event = {}",
261 + leadershipEvent.time(), leadershipEvent.type(),
262 + leadershipEvent);
263 + if (!leadershipEvent.subject().topic().equals(topicName)) {
264 + return; // Not our topic: ignore
265 + }
266 + if (eventLeaderId.equals(localNode.id())) {
267 + return; // My own message: ignore
268 + }
269 +
270 + synchronized (this) {
271 + switch (leadershipEvent.type()) {
272 + case LEADER_ELECTED:
273 + // FALLTHROUGH
274 + case LEADER_REELECTED:
275 + //
276 + // Another leader: if we are also a leader, then give up
277 + // leadership and run for re-election.
278 + //
279 + if ((leader != null) &&
280 + leader.id().equals(localNode.id())) {
281 + getLockFuture.cancel(true);
282 + } else {
283 + // Just update the current leader
284 + leader = leadershipEvent.subject().leader();
285 + lastLeadershipUpdateMs = System.currentTimeMillis();
286 + }
287 + eventDispatcher.post(leadershipEvent);
288 + break;
289 + case LEADER_BOOTED:
290 + // Remove the state for the current leader
291 + if ((leader != null) &&
292 + eventLeaderId.equals(leader.id())) {
293 + leader = null;
294 + }
295 + eventDispatcher.post(leadershipEvent);
296 + break;
297 + default:
298 + break;
299 + }
300 + }
301 + }
302 +
303 + private void doPeriodicProcessing() {
304 +
305 + while (!isShutdown) {
306 +
307 + //
308 + // Periodic tasks:
309 + // (a) Advertise ourselves as the leader
310 + // OR
311 + // (b) Expire a stale (remote) leader
312 + //
313 + synchronized (this) {
314 + LeadershipEvent leadershipEvent;
315 + if (leader != null) {
316 + if (leader.id().equals(localNode.id())) {
317 + //
318 + // Advertise ourselves as the leader
319 + //
320 + leadershipEvent = new LeadershipEvent(
321 + LeadershipEvent.Type.LEADER_REELECTED,
322 + new Leadership(topicName, localNode, 0));
323 + // Dispatch to all remote instances
324 + leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
325 + } else {
326 + //
327 + // Test if time to expire a stale leader
328 + //
329 + long delta = System.currentTimeMillis() -
330 + lastLeadershipUpdateMs;
331 + if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
332 + leadershipEvent = new LeadershipEvent(
333 + LeadershipEvent.Type.LEADER_BOOTED,
334 + new Leadership(topicName, leader, 0));
335 + // Dispatch only to the local listener(s)
336 + eventDispatcher.post(leadershipEvent);
337 + leader = null;
338 + }
339 + }
340 + }
341 + }
342 +
343 + // Sleep before re-advertising
344 + try {
345 + Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
346 + } catch (InterruptedException e) {
347 + log.debug("SDN-IP Leader Election periodic thread interrupted");
348 + }
349 + }
350 + }
351 +
352 + /**
353 + * Performs the leader election by using Hazelcast.
354 + */
355 + private void doLeaderElectionThread() {
356 +
357 + while (!isShutdown) {
358 + LeadershipEvent leadershipEvent;
359 + //
360 + // Try to acquire the lock and keep it until the instance is
361 + // shutdown.
362 + //
363 + log.debug("SDN-IP Leader Election begin for topic {}",
364 + topicName);
365 + try {
366 + // Block until it becomes the leader
367 + leaderLock.lockInterruptibly();
368 + } catch (InterruptedException e) {
369 + //
370 + // Thread interrupted. Either shutdown or run for
371 + // re-election.
372 + //
373 + log.debug("SDN-IP Election interrupted for topic {}",
374 + topicName);
375 + continue;
376 + }
377 +
378 + synchronized (this) {
379 + //
380 + // This instance is now the leader
381 + //
382 + log.info("SDN-IP Leader Elected for topic {}", topicName);
383 + leader = localNode;
384 + leadershipEvent = new LeadershipEvent(
385 + LeadershipEvent.Type.LEADER_ELECTED,
386 + new Leadership(topicName, localNode, 0));
387 + eventDispatcher.post(leadershipEvent);
388 + leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
389 + }
390 +
391 + try {
392 + // Sleep forever until interrupted
393 + Thread.sleep(Long.MAX_VALUE);
394 + } catch (InterruptedException e) {
395 + //
396 + // Thread interrupted. Either shutdown or run for
397 + // re-election.
398 + //
399 + log.debug("SDN-IP Leader Interrupted for topic {}",
400 + topicName);
401 + }
402 +
403 + synchronized (this) {
404 + // If we reach here, we should release the leadership
405 + log.debug("SDN-IP Leader Lock Released for topic {}",
406 + topicName);
407 + if ((leader != null) &&
408 + leader.id().equals(localNode.id())) {
409 + leader = null;
410 + }
411 + leadershipEvent = new LeadershipEvent(
412 + LeadershipEvent.Type.LEADER_BOOTED,
413 + new Leadership(topicName, localNode, 0));
414 + eventDispatcher.post(leadershipEvent);
415 + leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
416 + leaderLock.unlock();
417 + }
418 + }
419 + }
420 + }
421 +}