Madan Jampani
Committed by Pavlin Radoslavov

LeadershipService API change: Using NodeId in place of ControllerNode.

Change-Id: I6f688506c3672977456fc6921b26e98be2239632
......@@ -167,7 +167,7 @@ public class SdnIp implements SdnIpService {
if (!event.subject().topic().equals(appId.name())) {
return; // Not our topic: ignore
}
if (!event.subject().leader().id().equals(
if (!event.subject().leader().equals(
localControllerNode.id())) {
return; // The event is not about this instance: ignore
}
......
......@@ -37,7 +37,7 @@ public class LeaderCommand extends AbstractShellCommand {
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
print(FMT, "Topic", "Leader");
for (String topic : leaderBoard.keySet()) {
print(FMT, topic, leaderBoard.get(topic).leader().id());
print(FMT, topic, leaderBoard.get(topic).leader());
}
}
......
......@@ -10,10 +10,10 @@ import com.google.common.base.MoreObjects;
public class Leadership {
private final String topic;
private final ControllerNode leader;
private final NodeId leader;
private final long epoch;
public Leadership(String topic, ControllerNode leader, long epoch) {
public Leadership(String topic, NodeId leader, long epoch) {
this.topic = topic;
this.leader = leader;
this.epoch = epoch;
......@@ -28,10 +28,10 @@ public class Leadership {
}
/**
* The leader for this topic.
* The nodeId of leader for this topic.
* @return leader node.
*/
public ControllerNode leader() {
public NodeId leader() {
return leader;
}
......
......@@ -29,9 +29,9 @@ public interface LeadershipService {
/**
* Gets the most recent leader for the topic.
* @param path topic
* @return node who is the leader, null if so such topic exists.
* @return nodeId of the leader, null if so such topic exists.
*/
ControllerNode getLeader(String path);
NodeId getLeader(String path);
/**
* Joins the leadership contest.
......
......@@ -31,7 +31,6 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
......@@ -100,11 +99,11 @@ public class HazelcastLeadershipService implements LeadershipService {
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Topic> topics = Maps.newConcurrentMap();
private ControllerNode localNode;
private NodeId localNodeId;
@Activate
protected void activate() {
localNode = clusterService.getLocalNode();
localNodeId = clusterService.getLocalNode().id();
listenerRegistry = new AbstractListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
......@@ -124,7 +123,7 @@ public class HazelcastLeadershipService implements LeadershipService {
}
@Override
public ControllerNode getLeader(String path) {
public NodeId getLeader(String path) {
Topic topic = topics.get(path);
if (topic == null) {
return null;
......@@ -177,7 +176,7 @@ public class HazelcastLeadershipService implements LeadershipService {
private volatile long lastLeadershipUpdateMs = 0;
private ExecutorService leaderElectionExecutor;
private ControllerNode leader;
private NodeId leader;
private Lock leaderLock;
private Future<?> getLockFuture;
private Future<?> periodicProcessingFuture;
......@@ -198,7 +197,7 @@ public class HazelcastLeadershipService implements LeadershipService {
*
* @return the leader for the topic
*/
private ControllerNode leader() {
private NodeId leader() {
return leader;
}
......@@ -254,7 +253,7 @@ public class HazelcastLeadershipService implements LeadershipService {
public void onMessage(Message<byte[]> message) {
LeadershipEvent leadershipEvent =
SERIALIZER.decode(message.getMessageObject());
NodeId eventLeaderId = leadershipEvent.subject().leader().id();
NodeId eventLeaderId = leadershipEvent.subject().leader();
log.debug("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
......@@ -262,7 +261,7 @@ public class HazelcastLeadershipService implements LeadershipService {
if (!leadershipEvent.subject().topic().equals(topicName)) {
return; // Not our topic: ignore
}
if (eventLeaderId.equals(localNode.id())) {
if (eventLeaderId.equals(localNodeId)) {
return; // My own message: ignore
}
......@@ -276,7 +275,7 @@ public class HazelcastLeadershipService implements LeadershipService {
// leadership and run for re-election.
//
if ((leader != null) &&
leader.id().equals(localNode.id())) {
leader.equals(localNodeId)) {
getLockFuture.cancel(true);
} else {
// Just update the current leader
......@@ -288,7 +287,7 @@ public class HazelcastLeadershipService implements LeadershipService {
case LEADER_BOOTED:
// Remove the state for the current leader
if ((leader != null) &&
eventLeaderId.equals(leader.id())) {
eventLeaderId.equals(leader)) {
leader = null;
}
eventDispatcher.post(leadershipEvent);
......@@ -312,13 +311,13 @@ public class HazelcastLeadershipService implements LeadershipService {
synchronized (this) {
LeadershipEvent leadershipEvent;
if (leader != null) {
if (leader.id().equals(localNode.id())) {
if (leader.equals(localNodeId)) {
//
// Advertise ourselves as the leader
//
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(topicName, localNode, 0));
new Leadership(topicName, localNodeId, 0));
// Dispatch to all remote instances
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
} else {
......@@ -379,10 +378,10 @@ public class HazelcastLeadershipService implements LeadershipService {
// This instance is now the leader
//
log.info("Leader Elected for topic {}", topicName);
leader = localNode;
leader = localNodeId;
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNode, 0));
new Leadership(topicName, localNodeId, 0));
eventDispatcher.post(leadershipEvent);
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
}
......@@ -403,12 +402,12 @@ public class HazelcastLeadershipService implements LeadershipService {
// If we reach here, we should release the leadership
log.debug("Leader Lock Released for topic {}", topicName);
if ((leader != null) &&
leader.id().equals(localNode.id())) {
leader.equals(localNodeId)) {
leader = null;
}
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNode, 0));
new Leadership(topicName, localNodeId, 0));
eventDispatcher.post(leadershipEvent);
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
leaderLock.unlock();
......
......@@ -11,7 +11,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -19,11 +18,11 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
......@@ -36,6 +35,7 @@ import org.onlab.onos.store.service.impl.DistributedLockManager;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -77,7 +77,7 @@ public class LeadershipManager implements LeadershipService {
private final Map<String, Lock> openContests = Maps.newConcurrentMap();
private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
private ControllerNode localNode;
private NodeId localNodeId;
private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
......@@ -94,7 +94,7 @@ public class LeadershipManager implements LeadershipService {
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
localNodeId = clusterService.getLocalNode().id();
addListener(peerAdvertiser);
addListener(leaderBoardUpdater);
......@@ -120,7 +120,7 @@ public class LeadershipManager implements LeadershipService {
@Override
public ControllerNode getLeader(String path) {
public NodeId getLeader(String path) {
synchronized (leaderBoard) {
Leadership leadership = leaderBoard.get(path);
if (leadership != null) {
......@@ -155,7 +155,7 @@ public class LeadershipManager implements LeadershipService {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNode, lock.epoch())));
new Leadership(lock.path(), localNodeId, lock.epoch())));
}
}
......@@ -201,7 +201,7 @@ public class LeadershipManager implements LeadershipService {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(lock.path(), localNode, lock.epoch())));
new Leadership(lock.path(), localNodeId, lock.epoch())));
return;
} else {
log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
......@@ -236,7 +236,7 @@ public class LeadershipManager implements LeadershipService {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(lock.path(), localNode, lock.epoch())));
new Leadership(lock.path(), localNodeId, lock.epoch())));
threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
} else {
// Check if this node already withdrew from the contest, in which case
......@@ -245,7 +245,7 @@ public class LeadershipManager implements LeadershipService {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNode, lock.epoch())));
new Leadership(lock.path(), localNodeId, lock.epoch())));
// Retry leadership after a brief wait.
threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
}
......@@ -270,11 +270,11 @@ public class LeadershipManager implements LeadershipService {
@Override
public void event(LeadershipEvent event) {
// publish events originating on this host.
if (event.subject().leader().equals(localNode)) {
if (event.subject().leader().equals(localNodeId)) {
try {
clusterCommunicator.broadcast(
new ClusterMessage(
localNode.id(),
localNodeId,
LEADERSHIP_UPDATES,
SERIALIZER.encode(event)));
} catch (IOException e) {
......
......@@ -15,12 +15,12 @@
*/
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -29,10 +29,10 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.event.AbstractListenerRegistry;
......@@ -50,12 +50,12 @@ import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
@Component(immediate = true)
@Service
......@@ -82,7 +82,7 @@ public class HazelcastIntentBatchQueue
private HazelcastInstance theInstance;
private ControllerNode localControllerNode;
private NodeId localControllerNodeId;
protected StoreSerializer serializer;
private IntentBatchDelegate delegate;
private InternalLeaderListener leaderListener = new InternalLeaderListener();
......@@ -98,7 +98,7 @@ public class HazelcastIntentBatchQueue
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
localControllerNode = clusterService.getLocalNode();
localControllerNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leaderListener);
serializer = new KryoSerializer() {
......@@ -254,7 +254,7 @@ public class HazelcastIntentBatchQueue
if (!topic.startsWith(TOPIC_BASE)) {
return; // Not our topic: ignore
}
if (!event.subject().leader().id().equals(localControllerNode.id())) {
if (!event.subject().leader().equals(localControllerNodeId)) {
// run for leadership
getQueue(getAppId(topic));
return; // The event is not about this instance: ignore
......
......@@ -10,12 +10,12 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEvent.Type;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.cluster.NodeId;
/**
* A trivial implementation of the leadership service.
......@@ -35,8 +35,8 @@ public class SimpleLeadershipManager implements LeadershipService {
private Map<String, Boolean> elections = new ConcurrentHashMap<>();
@Override
public ControllerNode getLeader(String path) {
return elections.get(path) ? clusterService.getLocalNode() : null;
public NodeId getLeader(String path) {
return elections.get(path) ? clusterService.getLocalNode().id() : null;
}
@Override
......@@ -44,7 +44,7 @@ public class SimpleLeadershipManager implements LeadershipService {
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
new Leadership(path, clusterService.getLocalNode(), 0)));
new Leadership(path, clusterService.getLocalNode().id(), 0)));
}
}
......@@ -53,7 +53,7 @@ public class SimpleLeadershipManager implements LeadershipService {
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
new Leadership(path, clusterService.getLocalNode(), 0)));
new Leadership(path, clusterService.getLocalNode().id(), 0)));
}
}
......