Ayaka Koshibe

mastership manager listens to cluster events

Change-Id: I925967ef8ada88b9116d292d623d2f0f4cda3a8d
......@@ -56,7 +56,8 @@ public interface MastershipService {
Set<DeviceId> getDevicesOf(NodeId nodeId);
/**
* Returns the mastership term service for getting term information.
* Returns the mastership term service for getting read-only
* term information.
*
* @return the MastershipTermService for this mastership manager
*/
......
package org.onlab.onos.cluster;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import com.google.common.testing.EqualsTester;
public class MastershipTermTest {
private static final NodeId N1 = new NodeId("foo");
private static final NodeId N2 = new NodeId("bar");
private static final MastershipTerm TERM1 = MastershipTerm.of(N1, 0);
private static final MastershipTerm TERM2 = MastershipTerm.of(N2, 1);
private static final MastershipTerm TERM3 = MastershipTerm.of(N2, 1);
private static final MastershipTerm TERM4 = MastershipTerm.of(N1, 1);
@Test
public void basics() {
assertEquals("incorrect term number", 0, TERM1.termNumber());
assertEquals("incorrect master", new NodeId("foo"), TERM1.master());
}
@Test
public void testEquality() {
new EqualsTester().addEqualityGroup(MastershipTerm.of(N1, 0), TERM1)
.addEqualityGroup(TERM2, TERM3)
.addEqualityGroup(TERM4);
}
}
......@@ -6,6 +6,8 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
......@@ -49,15 +51,19 @@ public class MastershipManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private ClusterEventListener clusterListener = new InternalClusterEventListener();
@Activate
public void activate() {
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
clusterService.addListener(clusterListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
clusterService.removeListener(clusterListener);
log.info("Stopped");
}
......@@ -141,4 +147,27 @@ public class MastershipManager
}
//callback for reacting to cluster events
private class InternalClusterEventListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
//FIXME: worry about addition when the time comes
case INSTANCE_ADDED:
case INSTANCE_ACTIVATED:
break;
case INSTANCE_REMOVED:
case INSTANCE_DEACTIVATED:
for (DeviceId d : getDevicesOf(event.subject().id())) {
//this method should be an admin iface?
relinquishMastership(d);
}
break;
default:
log.warn("unknown cluster event {}", event);
}
}
}
}
......
......@@ -10,6 +10,7 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
......@@ -74,11 +75,14 @@ public class DeviceManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
protected MastershipTermService termService;
@Activate
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
mastershipService.addListener(mastershipListener);
termService = mastershipService.requestTermService();
log.info("Started");
}
......
......@@ -65,8 +65,8 @@ public class DefaultTopologyProvider extends AbstractProvider
private volatile boolean isStarted = false;
private TopologyProviderService providerService;
private DeviceListener deviceListener = new InnerDeviceListener();
private LinkListener linkListener = new InnerLinkListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
private EventAccumulator accumulator;
private ExecutorService executor;
......@@ -132,7 +132,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Callback for device events
private class InnerDeviceListener implements DeviceListener {
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
......@@ -144,7 +144,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Callback for link events
private class InnerLinkListener implements LinkListener {
private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
......
......@@ -7,8 +7,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
......@@ -48,9 +46,9 @@ public class SimpleMastershipStore
new DefaultControllerNode(new NodeId("local"), LOCALHOST);
//devices mapped to their masters, to emulate multiple nodes
protected final ConcurrentMap<DeviceId, NodeId> masterMap =
new ConcurrentHashMap<>();
protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
protected final Set<NodeId> masters = new HashSet<>();
@Activate
public void activate() {
......