Yuta HIGUCHI

DatabaseManager: add periodic leader advertisements

Change-Id: I6e9244a06191fe0f2dd5eaed7e043e84d704bfcd
1 package org.onlab.onos.store.service.impl; 1 package org.onlab.onos.store.service.impl;
2 2
3 +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
4 +import static org.onlab.util.Tools.namedThreads;
3 import static org.slf4j.LoggerFactory.getLogger; 5 import static org.slf4j.LoggerFactory.getLogger;
4 6
5 import java.io.File; 7 import java.io.File;
...@@ -11,6 +13,7 @@ import java.util.Map; ...@@ -11,6 +13,7 @@ import java.util.Map;
11 import java.util.Set; 13 import java.util.Set;
12 import java.util.concurrent.CountDownLatch; 14 import java.util.concurrent.CountDownLatch;
13 import java.util.concurrent.ExecutionException; 15 import java.util.concurrent.ExecutionException;
16 +import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.TimeUnit; 17 import java.util.concurrent.TimeUnit;
15 18
16 import net.kuujo.copycat.Copycat; 19 import net.kuujo.copycat.Copycat;
...@@ -105,6 +108,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -105,6 +108,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
105 108
106 private boolean autoAddMember = false; 109 private boolean autoAddMember = false;
107 110
111 + private ScheduledExecutorService executor;
112 +
113 + private volatile LeaderElectEvent myLeaderEvent = null;
114 +
108 @Activate 115 @Activate
109 public void activate() throws InterruptedException, ExecutionException { 116 public void activate() throws InterruptedException, ExecutionException {
110 117
...@@ -176,6 +183,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -176,6 +183,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
176 // of the Raft cluster. 183 // of the Raft cluster.
177 if (copycat != null) { 184 if (copycat != null) {
178 copycat.start().get(); 185 copycat.start().get();
186 +
187 + executor =
188 + newSingleThreadScheduledExecutor(namedThreads("db-heartbeat-%d"));
189 + executor.scheduleWithFixedDelay(new LeaderAdvertiser(), 5, 2, TimeUnit.SECONDS);
190 +
179 } 191 }
180 192
181 client.waitForLeader(); 193 client.waitForLeader();
...@@ -189,6 +201,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -189,6 +201,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
189 201
190 @Deactivate 202 @Deactivate
191 public void deactivate() { 203 public void deactivate() {
204 + if (executor != null) {
205 + executor.shutdownNow();
206 + }
192 clusterService.removeListener(clusterEventListener); 207 clusterService.removeListener(clusterEventListener);
193 // TODO: ClusterCommunicationService must support more than one 208 // TODO: ClusterCommunicationService must support more than one
194 // handler per message subject. 209 // handler per message subject.
...@@ -231,9 +246,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -231,9 +246,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
231 throw e; 246 throw e;
232 } else { 247 } else {
233 log.debug("Failed to listTables. Will retry...", e); 248 log.debug("Failed to listTables. Will retry...", e);
234 - retries++;
235 } 249 }
236 } 250 }
251 + retries++;
237 } while (true); 252 } while (true);
238 } 253 }
239 254
...@@ -395,17 +410,47 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -395,17 +410,47 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
395 } 410 }
396 } 411 }
397 412
413 + private final class LeaderAdvertiser implements Runnable {
414 +
415 + @Override
416 + public void run() {
417 + try {
418 + LeaderElectEvent event = myLeaderEvent;
419 + if (event != null) {
420 + log.trace("Broadcasting RAFT_LEADER_ELECTION_EVENT: {}", event);
421 + // This node just became the leader.
422 + clusterCommunicator.broadcastIncludeSelf(
423 + new ClusterMessage(
424 + clusterService.getLocalNode().id(),
425 + RAFT_LEADER_ELECTION_EVENT,
426 + ClusterMessagingProtocol.SERIALIZER.encode(event)));
427 + }
428 + } catch (Exception e) {
429 + log.debug("LeaderAdvertiser failed with exception", e);
430 + }
431 + }
432 +
433 + }
434 +
398 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> { 435 private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
399 @Override 436 @Override
400 public void handle(LeaderElectEvent event) { 437 public void handle(LeaderElectEvent event) {
401 try { 438 try {
439 + log.debug("Received LeaderElectEvent: {}", event);
402 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) { 440 if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
441 + log.debug("Broadcasting RAFT_LEADER_ELECTION_EVENT");
442 + myLeaderEvent = event;
403 // This node just became the leader. 443 // This node just became the leader.
404 clusterCommunicator.broadcastIncludeSelf( 444 clusterCommunicator.broadcastIncludeSelf(
405 new ClusterMessage( 445 new ClusterMessage(
406 clusterService.getLocalNode().id(), 446 clusterService.getLocalNode().id(),
407 RAFT_LEADER_ELECTION_EVENT, 447 RAFT_LEADER_ELECTION_EVENT,
408 ClusterMessagingProtocol.SERIALIZER.encode(event))); 448 ClusterMessagingProtocol.SERIALIZER.encode(event)));
449 + } else {
450 + if (myLeaderEvent != null) {
451 + log.debug("This node is no longer the Leader");
452 + }
453 + myLeaderEvent = null;
409 } 454 }
410 } catch (IOException e) { 455 } catch (IOException e) {
411 log.error("Failed to broadcast raft leadership change event", e); 456 log.error("Failed to broadcast raft leadership change event", e);
......