Pavlin Radoslavov

Added SDN-IP Leader election mechanism based on the Global Lock service.

NOTE: This is a short-term solution until the Leadership Service is
implemented.

Change-Id: Id1f21c06e67e08efb444ebec4e45eeca360e613d
...@@ -69,7 +69,7 @@ public class IntentSynchronizer { ...@@ -69,7 +69,7 @@ public class IntentSynchronizer {
69 69
70 bgpIntentsSynchronizerExecutor = Executors.newSingleThreadExecutor( 70 bgpIntentsSynchronizerExecutor = Executors.newSingleThreadExecutor(
71 new ThreadFactoryBuilder() 71 new ThreadFactoryBuilder()
72 - .setNameFormat("bgp-intents-synchronizer-%d").build()); 72 + .setNameFormat("sdnip-intents-synchronizer-%d").build());
73 } 73 }
74 74
75 /** 75 /**
......
...@@ -114,7 +114,8 @@ public class Router implements RouteListener { ...@@ -114,7 +114,8 @@ public class Router implements RouteListener {
114 HashMultimap.<Ip4Address, RouteEntry>create()); 114 HashMultimap.<Ip4Address, RouteEntry>create());
115 115
116 bgpUpdatesExecutor = Executors.newSingleThreadExecutor( 116 bgpUpdatesExecutor = Executors.newSingleThreadExecutor(
117 - new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build()); 117 + new ThreadFactoryBuilder()
118 + .setNameFormat("sdnip-bgp-updates-%d").build());
118 } 119 }
119 120
120 /** 121 /**
......
...@@ -18,6 +18,8 @@ package org.onlab.onos.sdnip; ...@@ -18,6 +18,8 @@ package org.onlab.onos.sdnip;
18 import static org.slf4j.LoggerFactory.getLogger; 18 import static org.slf4j.LoggerFactory.getLogger;
19 19
20 import java.util.Collection; 20 import java.util.Collection;
21 +import java.util.concurrent.ExecutorService;
22 +import java.util.concurrent.Executors;
21 23
22 import org.apache.felix.scr.annotations.Activate; 24 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component; 25 import org.apache.felix.scr.annotations.Component;
...@@ -33,8 +35,12 @@ import org.onlab.onos.sdnip.bgp.BgpRouteEntry; ...@@ -33,8 +35,12 @@ import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
33 import org.onlab.onos.sdnip.bgp.BgpSession; 35 import org.onlab.onos.sdnip.bgp.BgpSession;
34 import org.onlab.onos.sdnip.bgp.BgpSessionManager; 36 import org.onlab.onos.sdnip.bgp.BgpSessionManager;
35 import org.onlab.onos.sdnip.config.SdnIpConfigReader; 37 import org.onlab.onos.sdnip.config.SdnIpConfigReader;
38 +import org.onlab.onos.store.service.Lock;
39 +import org.onlab.onos.store.service.LockService;
36 import org.slf4j.Logger; 40 import org.slf4j.Logger;
37 41
42 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
43 +
38 /** 44 /**
39 * Component for the SDN-IP peering application. 45 * Component for the SDN-IP peering application.
40 */ 46 */
...@@ -43,6 +49,9 @@ import org.slf4j.Logger; ...@@ -43,6 +49,9 @@ import org.slf4j.Logger;
43 public class SdnIp implements SdnIpService { 49 public class SdnIp implements SdnIpService {
44 50
45 private static final String SDN_IP_APP = "org.onlab.onos.sdnip"; 51 private static final String SDN_IP_APP = "org.onlab.onos.sdnip";
52 + // NOTE: Must be 5s for now
53 + private static final int LEASE_DURATION_MS = 5 * 1000;
54 + private static final int LEASE_EXTEND_RETRY_MAX = 3;
46 55
47 private final Logger log = getLogger(getClass()); 56 private final Logger log = getLogger(getClass());
48 57
...@@ -55,15 +64,23 @@ public class SdnIp implements SdnIpService { ...@@ -55,15 +64,23 @@ public class SdnIp implements SdnIpService {
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 protected HostService hostService; 65 protected HostService hostService;
57 66
67 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 + protected LockService lockService;
69 +
58 private IntentSynchronizer intentSynchronizer; 70 private IntentSynchronizer intentSynchronizer;
59 private SdnIpConfigReader config; 71 private SdnIpConfigReader config;
60 private PeerConnectivityManager peerConnectivity; 72 private PeerConnectivityManager peerConnectivity;
61 private Router router; 73 private Router router;
62 private BgpSessionManager bgpSessionManager; 74 private BgpSessionManager bgpSessionManager;
63 75
76 + private ExecutorService leaderElectionExecutor;
77 + private Lock leaderLock;
78 + private volatile boolean isShutdown = true;
79 +
64 @Activate 80 @Activate
65 protected void activate() { 81 protected void activate() {
66 - log.debug("SDN-IP started"); 82 + log.info("SDN-IP started");
83 + isShutdown = false;
67 84
68 ApplicationId appId = coreService.registerApplication(SDN_IP_APP); 85 ApplicationId appId = coreService.registerApplication(SDN_IP_APP);
69 config = new SdnIpConfigReader(); 86 config = new SdnIpConfigReader();
...@@ -83,9 +100,20 @@ public class SdnIp implements SdnIpService { ...@@ -83,9 +100,20 @@ public class SdnIp implements SdnIpService {
83 interfaceService); 100 interfaceService);
84 router.start(); 101 router.start();
85 102
103 + leaderLock = lockService.create(SDN_IP_APP + "/sdnIpLeaderLock");
104 + leaderElectionExecutor = Executors.newSingleThreadExecutor(
105 + new ThreadFactoryBuilder()
106 + .setNameFormat("sdnip-leader-election-%d").build());
107 + leaderElectionExecutor.execute(new Runnable() {
108 + @Override
109 + public void run() {
110 + doLeaderElectionThread();
111 + }
112 + });
113 +
86 // Manually set the instance as the leader to allow testing 114 // Manually set the instance as the leader to allow testing
87 // TODO change this when we get a leader election 115 // TODO change this when we get a leader election
88 - intentSynchronizer.leaderChanged(true); 116 + // intentSynchronizer.leaderChanged(true);
89 117
90 bgpSessionManager = new BgpSessionManager(router); 118 bgpSessionManager = new BgpSessionManager(router);
91 // TODO: the local BGP listen port number should be configurable 119 // TODO: the local BGP listen port number should be configurable
...@@ -96,11 +124,16 @@ public class SdnIp implements SdnIpService { ...@@ -96,11 +124,16 @@ public class SdnIp implements SdnIpService {
96 124
97 @Deactivate 125 @Deactivate
98 protected void deactivate() { 126 protected void deactivate() {
127 + isShutdown = true;
128 +
99 bgpSessionManager.stop(); 129 bgpSessionManager.stop();
100 router.stop(); 130 router.stop();
101 peerConnectivity.stop(); 131 peerConnectivity.stop();
102 intentSynchronizer.stop(); 132 intentSynchronizer.stop();
103 133
134 + // Stop the thread(s)
135 + leaderElectionExecutor.shutdownNow();
136 +
104 log.info("Stopped"); 137 log.info("Stopped");
105 } 138 }
106 139
...@@ -127,4 +160,66 @@ public class SdnIp implements SdnIpService { ...@@ -127,4 +160,66 @@ public class SdnIp implements SdnIpService {
127 static String dpidToUri(String dpid) { 160 static String dpidToUri(String dpid) {
128 return "of:" + dpid.replace(":", ""); 161 return "of:" + dpid.replace(":", "");
129 } 162 }
163 +
164 + /**
165 + * Performs the leader election.
166 + */
167 + private void doLeaderElectionThread() {
168 +
169 + //
170 + // Try to acquire the lock and keep extending it until the instance
171 + // is shutdown.
172 + //
173 + while (!isShutdown) {
174 + log.debug("SDN-IP Leader Election begin");
175 +
176 + // Block until it becomes the leader
177 + leaderLock.lock(LEASE_DURATION_MS);
178 +
179 + // This instance is the leader
180 + log.info("SDN-IP Leader Elected");
181 + intentSynchronizer.leaderChanged(true);
182 +
183 + // Keep extending the expiration until shutdown
184 + int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
185 +
186 + //
187 + // Keep periodically extending the lock expiration.
188 + // If there are multiple back-to-back failures to extend (with
189 + // extra sleep time between retrials), then release the lock.
190 + //
191 + while (!isShutdown) {
192 + try {
193 + Thread.sleep(LEASE_DURATION_MS / LEASE_EXTEND_RETRY_MAX);
194 + if (leaderLock.extendExpiration(LEASE_DURATION_MS)) {
195 + log.trace("SDN-IP Leader Extended");
196 + extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX;
197 + } else {
198 + log.debug("SDN-IP Leader Cannot Extend Election");
199 + if (!leaderLock.isLocked()) {
200 + log.debug("SDN-IP Leader Lock Lost");
201 + intentSynchronizer.leaderChanged(false);
202 + break; // Try again to get the lock
203 + }
204 + extensionFailedCountdown--;
205 + if (extensionFailedCountdown <= 0) {
206 + // Failed too many times to extend.
207 + // Release the lock.
208 + log.debug("SDN-IP Leader Lock Released");
209 + intentSynchronizer.leaderChanged(false);
210 + leaderLock.unlock();
211 + break; // Try again to get the lock
212 + }
213 + }
214 + } catch (InterruptedException e) {
215 + // Thread interrupted. Time to shutdown
216 + log.debug("SDN-IP Leader Interrupted");
217 + }
218 + }
219 + }
220 +
221 + // If we reach here, the instance was shutdown
222 + intentSynchronizer.leaderChanged(false);
223 + leaderLock.unlock();
224 + }
130 } 225 }
......