alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 64 changed files with 1213 additions and 105 deletions
......@@ -56,7 +56,8 @@ public interface MastershipService {
Set<DeviceId> getDevicesOf(NodeId nodeId);
/**
* Returns the mastership term service for getting term information.
* Returns the mastership term service for getting read-only
* term information.
*
* @return the MastershipTermService for this mastership manager
*/
......
......@@ -64,4 +64,14 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
* @return the current master's ID and the term value for device, or null
*/
MastershipTerm getTermFor(DeviceId deviceId);
/**
* Revokes a controller instance's mastership over a device and hands
* over mastership to another controller instance.
*
* @param nodeId the controller instance identifier
* @param deviceId device to revoke mastership for
* @return a mastership event
*/
MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId);
}
......
......@@ -48,6 +48,7 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription);
// TODO: We may need to enforce that ancillary cannot interfere this state
/**
* Removes the specified infrastructure device.
*
......@@ -60,22 +61,24 @@ public interface DeviceStore extends Store<DeviceEvent, DeviceStoreDelegate> {
* Updates the ports of the specified infrastructure device using the given
* list of port descriptions. The list is assumed to be comprehensive.
*
* @param providerId provider identifier
* @param deviceId device identifier
* @param portDescriptions list of port descriptions
* @return ready to send events describing what occurred; empty list if no change
*/
List<DeviceEvent> updatePorts(DeviceId deviceId,
List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions);
/**
* Updates the port status of the specified infrastructure device using the
* given port description.
*
* @param providerId provider identifier
* @param deviceId device identifier
* @param portDescription port description
* @return ready to send event describing what occurred; null if no change
*/
DeviceEvent updatePortStatus(DeviceId deviceId,
DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription);
/**
......
......@@ -35,10 +35,22 @@ public abstract class AbstractProviderRegistry<P extends Provider, S extends Pro
public synchronized S register(P provider) {
checkNotNull(provider, "Provider cannot be null");
checkState(!services.containsKey(provider.id()), "Provider %s already registered", provider.id());
// If the provider is a primary one, check for a conflict.
ProviderId pid = provider.id();
checkState(pid.isAncillary() || !providersByScheme.containsKey(pid.scheme()),
"A primary provider with id %s is already registered",
providersByScheme.get(pid.scheme()));
S service = createProviderService(provider);
services.put(provider.id(), service);
providers.put(provider.id(), provider);
// FIXME populate scheme look-up
// Register the provider by URI scheme only if it is not ancillary.
if (!pid.isAncillary()) {
providersByScheme.put(pid.scheme(), provider);
}
return service;
}
......
......@@ -11,6 +11,7 @@ public class ProviderId {
private final String scheme;
private final String id;
private final boolean ancillary;
/**
* Creates a new provider identifier from the specified string.
......@@ -21,8 +22,22 @@ public class ProviderId {
* @param id string identifier
*/
public ProviderId(String scheme, String id) {
this(scheme, id, false);
}
/**
* Creates a new provider identifier from the specified string.
* The providers are expected to follow the reverse DNS convention, e.g.
* {@code org.onlab.onos.provider.of.device}
*
* @param scheme device URI scheme to which this provider is bound, e.g. "of", "snmp"
* @param id string identifier
* @param ancillary ancillary provider indicator
*/
public ProviderId(String scheme, String id, boolean ancillary) {
this.scheme = scheme;
this.id = id;
this.ancillary = ancillary;
}
/**
......@@ -35,6 +50,15 @@ public class ProviderId {
}
/**
* Indicates whether the provider id belongs to an ancillary provider.
*
* @return true for ancillary; false for primary provider
*/
public boolean isAncillary() {
return ancillary;
}
/**
* Returns the device URI scheme specific id portion.
*
* @return id
......@@ -56,14 +80,16 @@ public class ProviderId {
if (obj instanceof ProviderId) {
final ProviderId other = (ProviderId) obj;
return Objects.equals(this.scheme, other.scheme) &&
Objects.equals(this.id, other.id);
Objects.equals(this.id, other.id) &&
this.ancillary == other.ancillary;
}
return false;
}
@Override
public String toString() {
return toStringHelper(this).add("scheme", scheme).add("id", id).toString();
return toStringHelper(this).add("scheme", scheme).add("id", id)
.add("ancillary", ancillary).toString();
}
}
......
package org.onlab.onos.cluster;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import com.google.common.testing.EqualsTester;
public class MastershipTermTest {
private static final NodeId N1 = new NodeId("foo");
private static final NodeId N2 = new NodeId("bar");
private static final MastershipTerm TERM1 = MastershipTerm.of(N1, 0);
private static final MastershipTerm TERM2 = MastershipTerm.of(N2, 1);
private static final MastershipTerm TERM3 = MastershipTerm.of(N2, 1);
private static final MastershipTerm TERM4 = MastershipTerm.of(N1, 1);
@Test
public void basics() {
assertEquals("incorrect term number", 0, TERM1.termNumber());
assertEquals("incorrect master", new NodeId("foo"), TERM1.master());
}
@Test
public void testEquality() {
new EqualsTester().addEqualityGroup(MastershipTerm.of(N1, 0), TERM1)
.addEqualityGroup(TERM2, TERM3)
.addEqualityGroup(TERM4);
}
}
......@@ -35,7 +35,7 @@ public class AbstractProviderRegistryTest {
assertThat("provider not found", registry.getProviders().contains(fooId));
assertEquals("incorrect provider", psFoo.provider(), pFoo);
ProviderId barId = new ProviderId("of", "bar");
ProviderId barId = new ProviderId("snmp", "bar");
TestProvider pBar = new TestProvider(barId);
TestProviderService psBar = registry.register(pBar);
assertEquals("incorrect provider count", 2, registry.getProviders().size());
......@@ -49,6 +49,16 @@ public class AbstractProviderRegistryTest {
assertThat("provider not found", registry.getProviders().contains(barId));
}
@Test
public void ancillaryProviders() {
TestProviderRegistry registry = new TestProviderRegistry();
TestProvider pFoo = new TestProvider(new ProviderId("of", "foo"));
TestProvider pBar = new TestProvider(new ProviderId("of", "bar", true));
registry.register(pFoo);
registry.register(pBar);
assertEquals("incorrect provider count", 2, registry.getProviders().size());
}
@Test(expected = IllegalStateException.class)
public void duplicateRegistration() {
TestProviderRegistry registry = new TestProviderRegistry();
......@@ -57,6 +67,15 @@ public class AbstractProviderRegistryTest {
registry.register(pFoo);
}
@Test(expected = IllegalStateException.class)
public void duplicateSchemeRegistration() {
TestProviderRegistry registry = new TestProviderRegistry();
TestProvider pFoo = new TestProvider(new ProviderId("of", "foo"));
TestProvider pBar = new TestProvider(new ProviderId("of", "bar"));
registry.register(pFoo);
registry.register(pBar);
}
@Test
public void voidUnregistration() {
TestProviderRegistry registry = new TestProviderRegistry();
......
......@@ -11,6 +11,8 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
......@@ -52,9 +54,12 @@ implements MastershipService, MastershipAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private ClusterEventListener clusterListener = new InternalClusterEventListener();
@Activate
public void activate() {
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
clusterService.addListener(clusterListener);
store.setDelegate(delegate);
log.info("Started");
}
......@@ -62,6 +67,7 @@ implements MastershipService, MastershipAdminService {
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
clusterService.removeListener(clusterListener);
store.unsetDelegate(delegate);
log.info("Stopped");
}
......@@ -71,14 +77,18 @@ implements MastershipService, MastershipAdminService {
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
//TODO figure out appropriate action for non-MASTER roles, if we even set those
MastershipEvent event = null;
if (role.equals(MastershipRole.MASTER)) {
MastershipEvent event = store.setMaster(nodeId, deviceId);
event = store.setMaster(nodeId, deviceId);
} else {
event = store.unsetMaster(nodeId, deviceId);
}
if (event != null) {
post(event);
}
}
}
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
......@@ -88,8 +98,16 @@ implements MastershipService, MastershipAdminService {
@Override
public void relinquishMastership(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
// FIXME: add method to store to give up mastership and trigger new master selection process
MastershipRole role = getLocalRole(deviceId);
if (!role.equals(MastershipRole.MASTER)) {
return;
}
MastershipEvent event = store.unsetMaster(
clusterService.getLocalNode().id(), deviceId);
if (event != null) {
post(event);
}
}
@Override
......@@ -146,6 +164,26 @@ implements MastershipService, MastershipAdminService {
}
//callback for reacting to cluster events
private class InternalClusterEventListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
//FIXME: worry about addition when the time comes
case INSTANCE_ADDED:
case INSTANCE_ACTIVATED:
break;
case INSTANCE_REMOVED:
case INSTANCE_DEACTIVATED:
break;
default:
log.warn("unknown cluster event {}", event);
}
}
}
public class InternalDelegate implements MastershipStoreDelegate {
@Override
......
......@@ -16,6 +16,7 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
......@@ -76,6 +77,8 @@ public class DeviceManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
protected MastershipTermService termService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
......@@ -84,6 +87,7 @@ public class DeviceManager
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
mastershipService.addListener(mastershipListener);
termService = mastershipService.requestTermService();
log.info("Started");
}
......@@ -198,7 +202,7 @@ public class DeviceManager
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
mastershipService.getLocalRole(deviceId));
mastershipService.requestRoleFor(deviceId));
post(event);
}
}
......@@ -208,8 +212,11 @@ public class DeviceManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of mastership.
if (event != null) {
log.info("Device {} disconnected", deviceId);
mastershipService.relinquishMastership(deviceId);
post(event);
}
}
......@@ -221,8 +228,9 @@ public class DeviceManager
checkNotNull(portDescriptions,
"Port descriptions list cannot be null");
checkValidity();
List<DeviceEvent> events = store.updatePorts(deviceId,
portDescriptions);
this.provider().id();
List<DeviceEvent> events = store.updatePorts(this.provider().id(),
deviceId, portDescriptions);
for (DeviceEvent event : events) {
post(event);
}
......@@ -234,8 +242,8 @@ public class DeviceManager
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(portDescription, PORT_DESCRIPTION_NULL);
checkValidity();
DeviceEvent event = store.updatePortStatus(deviceId,
portDescription);
DeviceEvent event = store.updatePortStatus(this.provider().id(),
deviceId, portDescription);
if (event != null) {
log.info("Device {} port {} status changed", deviceId, event
.port().number());
......
......@@ -65,8 +65,8 @@ public class DefaultTopologyProvider extends AbstractProvider
private volatile boolean isStarted = false;
private TopologyProviderService providerService;
private DeviceListener deviceListener = new InnerDeviceListener();
private LinkListener linkListener = new InnerLinkListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
private EventAccumulator accumulator;
private ExecutorService executor;
......@@ -132,7 +132,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Callback for device events
private class InnerDeviceListener implements DeviceListener {
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
......@@ -144,7 +144,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Callback for link events
private class InnerLinkListener implements LinkListener {
private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
......
......@@ -15,10 +15,11 @@ import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.trivial.impl.SimpleMastershipStore;
import org.onlab.onos.store.trivial.impl.SimpleMastershipStore;
import org.onlab.packet.IpPrefix;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.onlab.onos.net.MastershipRole.*;
/**
......@@ -65,7 +66,24 @@ public class MastershipManagerTest {
@Test
public void relinquishMastership() {
//TODO
//no backups - should turn to standby and no master for device
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
assertEquals("wrong role:", MASTER, mgr.getLocalRole(DEV_MASTER));
mgr.relinquishMastership(DEV_MASTER);
assertNull("wrong master:", mgr.getMasterFor(DEV_OTHER));
assertEquals("wrong role:", STANDBY, mgr.getLocalRole(DEV_MASTER));
//not master, nothing should happen
mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
mgr.relinquishMastership(DEV_OTHER);
assertNull("wrong role:", mgr.getMasterFor(DEV_OTHER));
//provide NID_OTHER as backup and relinquish
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
assertEquals("wrong master:", NID_LOCAL, mgr.getMasterFor(DEV_MASTER));
mgr.setRole(NID_OTHER, DEV_MASTER, STANDBY);
mgr.relinquishMastership(DEV_MASTER);
assertEquals("wrong master:", NID_OTHER, mgr.getMasterFor(DEV_MASTER));
}
@Test
......@@ -95,7 +113,6 @@ public class MastershipManagerTest {
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
mgr.setRole(NID_LOCAL, DEV_OTHER, STANDBY);
assertEquals("should be one device:", 1, mgr.getDevicesOf(NID_LOCAL).size());
//hand both devices to NID_LOCAL
mgr.setRole(NID_LOCAL, DEV_OTHER, MASTER);
assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size());
......
......@@ -27,7 +27,7 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.trivial.impl.SimpleDeviceStore;
import org.onlab.onos.store.trivial.impl.SimpleDeviceStore;
import java.util.ArrayList;
import java.util.Iterator;
......
......@@ -40,7 +40,7 @@ import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.trivial.impl.SimpleFlowRuleStore;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
......
......@@ -34,7 +34,7 @@ import org.onlab.onos.net.host.HostProviderService;
import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.trivial.impl.SimpleHostStore;
import org.onlab.onos.store.trivial.impl.SimpleHostStore;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
......
......@@ -23,7 +23,7 @@ import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.device.impl.DeviceManager;
import org.onlab.onos.net.trivial.impl.SimpleLinkStore;
import org.onlab.onos.store.trivial.impl.SimpleLinkStore;
import java.util.ArrayList;
import java.util.Iterator;
......
......@@ -24,7 +24,7 @@ import org.onlab.onos.net.topology.TopologyProvider;
import org.onlab.onos.net.topology.TopologyProviderRegistry;
import org.onlab.onos.net.topology.TopologyProviderService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.onos.net.trivial.impl.SimpleTopologyStore;
import org.onlab.onos.store.trivial.impl.SimpleTopologyStore;
import java.util.ArrayList;
import java.util.List;
......
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.Timestamp;
import com.google.common.collect.ImmutableMap;
/**
* Anti-Entropy advertisement message.
* <p>
* Message to advertise the information this node holds.
*
* @param <ID> ID type
*/
public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, Timestamp> advertisement;
/**
* Creates anti-entropy advertisement message.
*
* @param sender sender of this message
* @param advertisement timestamp information of the data sender holds
*/
public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
super(AE_ADVERTISEMENT);
this.sender = sender;
this.advertisement = ImmutableMap.copyOf(advertisement);
}
public NodeId sender() {
return sender;
}
public ImmutableMap<ID, Timestamp> advertisement() {
return advertisement;
}
// Default constructor for serializer
protected AntiEntropyAdvertisement() {
super(AE_ADVERTISEMENT);
this.sender = null;
this.advertisement = null;
}
}
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
import java.util.Map;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.device.impl.VersionedValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* Anti-Entropy reply message.
* <p>
* Message to send in reply to advertisement or another reply.
* Suggest to the sender about the more up-to-date data this node has,
* and request for more recent data that the receiver has.
*/
public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
private final NodeId sender;
private final ImmutableMap<ID, V> suggestion;
private final ImmutableSet<ID> request;
/**
* Creates a reply to anti-entropy message.
*
* @param sender sender of this message
* @param suggestion collection of more recent values, sender had
* @param request Collection of identifiers
*/
public AntiEntropyReply(NodeId sender,
Map<ID, V> suggestion,
Set<ID> request) {
super(AE_REPLY);
this.sender = sender;
this.suggestion = ImmutableMap.copyOf(suggestion);
this.request = ImmutableSet.copyOf(request);
}
public NodeId sender() {
return sender;
}
/**
* Returns collection of values, which the recipient of this reply is likely
* to be missing or has outdated version.
*
* @return
*/
public ImmutableMap<ID, V> suggestion() {
return suggestion;
}
/**
* Returns collection of identifier to request.
*
* @return collection of identifier to request
*/
public ImmutableSet<ID> request() {
return request;
}
/**
* Checks if reply contains any suggestion or request.
*
* @return true if nothing is suggested and requested
*/
public boolean isEmpty() {
return suggestion.isEmpty() && request.isEmpty();
}
// Default constructor for serializer
protected AntiEntropyReply() {
super(AE_REPLY);
this.sender = null;
this.suggestion = null;
this.request = null;
}
}
......@@ -15,6 +15,12 @@ public enum MessageSubject {
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
ECHO,
/** Anti-Entropy advertisement message. */
AE_ADVERTISEMENT,
/** Anti-Entropy reply message. */
AE_REPLY,
}
......
package org.onlab.onos.store.device.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
public class DeviceAntiEntropyAdvertisement
extends AntiEntropyAdvertisement<DeviceId> {
public DeviceAntiEntropyAdvertisement(NodeId sender,
Map<DeviceId, Timestamp> advertisement) {
super(sender, advertisement);
}
// May need to add ProviderID, etc.
public static DeviceAntiEntropyAdvertisement create(
NodeId self,
Collection<VersionedValue<Device>> localValues) {
Map<DeviceId, Timestamp> ads = new HashMap<>(localValues.size());
for (VersionedValue<Device> e : localValues) {
ads.put(e.entity().id(), e.timestamp());
}
return new DeviceAntiEntropyAdvertisement(self, ads);
}
// For serializer
protected DeviceAntiEntropyAdvertisement() {}
}
package org.onlab.onos.store.device.impl;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class DeviceAntiEntropyReply
extends AntiEntropyReply<DeviceId, VersionedValue<Device>> {
public DeviceAntiEntropyReply(NodeId sender,
Map<DeviceId, VersionedValue<Device>> suggestion,
Set<DeviceId> request) {
super(sender, suggestion, request);
}
/**
* Creates a reply to Anti-Entropy advertisement.
*
* @param advertisement to respond to
* @param self node identifier representing local node
* @param localValues local values held on this node
* @return reply message
*/
public static DeviceAntiEntropyReply reply(
DeviceAntiEntropyAdvertisement advertisement,
NodeId self,
Collection<VersionedValue<Device>> localValues
) {
ImmutableMap<DeviceId, Timestamp> ads = advertisement.advertisement();
ImmutableMap.Builder<DeviceId, VersionedValue<Device>>
sug = ImmutableMap.builder();
Set<DeviceId> req = new HashSet<>(ads.keySet());
for (VersionedValue<Device> e : localValues) {
final DeviceId id = e.entity().id();
final Timestamp local = e.timestamp();
final Timestamp theirs = ads.get(id);
if (theirs == null) {
// they don't have it, suggest
sug.put(id, e);
// don't need theirs
req.remove(id);
} else if (local.compareTo(theirs) < 0) {
// they got older one, suggest
sug.put(id, e);
// don't need theirs
req.remove(id);
} else if (local.equals(theirs)) {
// same, don't need theirs
req.remove(id);
}
}
return new DeviceAntiEntropyReply(self, sug.build(), req);
}
/**
* Creates a reply to request for values held locally.
*
* @param requests message containing the request
* @param self node identifier representing local node
* @param localValues local valeds held on this node
* @return reply message
*/
public static DeviceAntiEntropyReply reply(
DeviceAntiEntropyReply requests,
NodeId self,
Map<DeviceId, VersionedValue<Device>> localValues
) {
Set<DeviceId> reqs = requests.request();
Map<DeviceId, VersionedValue<Device>> requested = new HashMap<>(reqs.size());
for (DeviceId id : reqs) {
final VersionedValue<Device> value = localValues.get(id);
if (value != null) {
requested.put(id, value);
}
}
Set<DeviceId> empty = ImmutableSet.of();
return new DeviceAntiEntropyReply(self, requested, empty);
}
// For serializer
protected DeviceAntiEntropyReply() {}
}
......@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -59,8 +60,8 @@ public class OnosDistributedDeviceStore
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
private ConcurrentHashMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentHashMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
......@@ -191,7 +192,7 @@ public class OnosDistributedDeviceStore
}
@Override
public List<DeviceEvent> updatePorts(DeviceId deviceId,
public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
......@@ -295,7 +296,7 @@ public class OnosDistributedDeviceStore
}
@Override
public DeviceEvent updatePortStatus(DeviceId deviceId,
public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
VersionedValue<Device> device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
......
package org.onlab.onos.store.device.impl;
import java.util.Objects;
import org.onlab.onos.store.Timestamp;
/**
......@@ -42,4 +44,35 @@ public class VersionedValue<T> {
public Timestamp timestamp() {
return timestamp;
}
@Override
public int hashCode() {
return Objects.hash(entity, timestamp, isUp);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("unchecked")
VersionedValue<T> that = (VersionedValue<T>) obj;
return Objects.equals(this.entity, that.entity) &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.isUp, that.isUp);
}
// Default constructor for serializer
protected VersionedValue() {
this.entity = null;
this.isUp = false;
this.timestamp = null;
}
}
......
......@@ -123,6 +123,12 @@ implements MastershipStore {
return null;
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
// TODO Auto-generated method stub
return null;
}
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
......
......@@ -221,7 +221,7 @@ public class DistributedDeviceStore
}
@Override
public List<DeviceEvent> updatePorts(DeviceId deviceId,
public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
......@@ -319,7 +319,7 @@ public class DistributedDeviceStore
}
@Override
public DeviceEvent updatePortStatus(DeviceId deviceId,
public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
synchronized (this) {
Device device = devices.getUnchecked(deviceId).orNull();
......
......@@ -28,7 +28,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
* TEMPORARY: Manages inventory of flow rules using distributed store implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
......
/**
* Implementation of flow store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.flow.impl;
......@@ -39,8 +39,8 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
/**
* Manages inventory of end-station hosts using trivial in-memory
* implementation.
* TEMPORARY: Manages inventory of end-station hosts using distributed
* structures implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
@Component(immediate = true)
......
/**
* Implementation of host store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.host.impl;
......@@ -28,7 +28,7 @@ import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
/**
* Manages inventory of topology snapshots using trivial in-memory
* TEMPORARY: Manages inventory of topology snapshots using distributed
* structures implementation.
*/
//FIXME: I LIE I AM NOT DISTRIBUTED
......
/**
* Implementation of topology store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.topology.impl;
......@@ -201,7 +201,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P2, true)
);
List<DeviceEvent> events = deviceStore.updatePorts(DID1, pds);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
......@@ -220,7 +220,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P3, true)
);
events = deviceStore.updatePorts(DID1, pds2);
events = deviceStore.updatePorts(PID, DID1, pds2);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -243,7 +243,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
events = deviceStore.updatePorts(DID1, pds3);
events = deviceStore.updatePorts(PID, DID1, pds3);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -268,9 +268,9 @@ public class DistributedDeviceStoreTest {
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(DID1,
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
......@@ -286,7 +286,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
List<Port> ports = deviceStore.getPorts(DID1);
......@@ -309,7 +309,7 @@ public class DistributedDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, false)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Port port1 = deviceStore.getPort(DID1, P1);
assertEquals(P1, port1.number());
......
package org.onlab.onos.store.serializers;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.MapSerializer;
import com.google.common.collect.ImmutableMap;
/**
* Kryo Serializer for {@link ImmutableMap}.
*/
public class ImmutableMapSerializer extends FamilySerializer<ImmutableMap<?, ?>> {
private final MapSerializer mapSerializer = new MapSerializer();
public ImmutableMapSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableMap<?, ?> object) {
// wrapping with unmodifiableMap proxy
// to avoid Kryo from writing only the reference marker of this instance,
// which will be embedded right before this method call.
kryo.writeObject(output, Collections.unmodifiableMap(object), mapSerializer);
}
@Override
public ImmutableMap<?, ?> read(Kryo kryo, Input input,
Class<ImmutableMap<?, ?>> type) {
Map<?, ?> map = kryo.readObject(input, HashMap.class, mapSerializer);
return ImmutableMap.copyOf(map);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableMap.of().getClass(), this);
kryo.register(ImmutableMap.of(1, 2).getClass(), this);
kryo.register(ImmutableMap.of(1, 2, 3, 4).getClass(), this);
// TODO register required ImmutableMap variants
}
}
package org.onlab.onos.store.serializers;
import java.util.ArrayList;
import java.util.List;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableSet;
/**
* Kryo Serializer for {@link ImmutableSet}.
*/
public class ImmutableSetSerializer extends FamilySerializer<ImmutableSet<?>> {
private final CollectionSerializer serializer = new CollectionSerializer();
public ImmutableSetSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableSet<?> object) {
kryo.writeObject(output, object.asList(), serializer);
}
@Override
public ImmutableSet<?> read(Kryo kryo, Input input,
Class<ImmutableSet<?>> type) {
List<?> elms = kryo.readObject(input, ArrayList.class, serializer);
return ImmutableSet.copyOf(elms);
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableSet.of().getClass(), this);
kryo.register(ImmutableSet.of(1).getClass(), this);
kryo.register(ImmutableSet.of(1, 2).getClass(), this);
// TODO register required ImmutableSet variants
}
}
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -100,4 +101,14 @@ public class KryoSerializationManager implements KryoSerializationService {
return serializerPool.deserialize(bytes);
}
@Override
public void serialize(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
@Override
public <T> T deserialize(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
}
......
package org.onlab.onos.store.serializers;
import java.nio.ByteBuffer;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
......@@ -16,6 +18,15 @@ public interface KryoSerializationService {
public byte[] serialize(final Object obj);
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @param buffer to write serialized bytes
*/
public void serialize(final Object obj, ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
......@@ -24,4 +35,12 @@ public interface KryoSerializationService {
*/
public <T> T deserialize(final byte[] bytes);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param buffer bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final ByteBuffer buffer);
}
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.net.MastershipRole;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.net.MastershipRole}.
*/
public class MastershipRoleSerializer extends Serializer<MastershipRole> {
@Override
public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
final String role = kryo.readObject(input, String.class);
return MastershipRole.valueOf(role);
}
@Override
public void write(Kryo kryo, Output output, MastershipRole object) {
kryo.writeObject(output, object.toString());
}
}
package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.cluster.MastershipTerm}.
*/
public class MastershipTermSerializer extends Serializer<MastershipTerm> {
@Override
public MastershipTerm read(Kryo kryo, Input input, Class<MastershipTerm> type) {
final NodeId node = new NodeId(kryo.readObject(input, String.class));
final int term = input.readInt();
return MastershipTerm.of(node, term);
}
@Override
public void write(Kryo kryo, Output output, MastershipTerm object) {
output.writeString(object.master().toString());
output.writeInt(object.termNumber());
}
}
package org.onlab.onos.store.serializers;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.PortNumber.portNumber;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
import de.javakaffee.kryoserializers.URISerializer;
public class KryoSerializerTests {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final PortNumber P1 = portNumber(1);
private static final PortNumber P2 = portNumber(2);
private static final ConnectPoint CP1 = new ConnectPoint(DID1, P1);
private static final ConnectPoint CP2 = new ConnectPoint(DID2, P2);
private static final String MFR = "whitebox";
private static final String HW = "1.1.x";
private static final String SW1 = "3.8.1";
private static final String SW2 = "3.9.5";
private static final String SN = "43311-12345";
private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
private static KryoPool kryos;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
kryos = KryoPool.newBuilder()
.register(
ArrayList.class,
HashMap.class
)
.register(
Device.Type.class,
Link.Type.class
// ControllerNode.State.class,
// DefaultControllerNode.class,
// MastershipRole.class,
// Port.class,
// Element.class,
)
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(IpPrefix.class, new IpPrefixSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DefaultDevice.class)
.register(URI.class, new URISerializer())
.register(MastershipRole.class, new MastershipRoleSerializer())
.register(MastershipTerm.class, new MastershipTermSerializer())
.build();
}
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
// removing Kryo instance to use fresh Kryo on each tests
kryos.getKryo();
}
private static <T> void testSerialized(T original) {
ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
kryos.serialize(original, buffer);
buffer.flip();
T copy = kryos.deserialize(buffer);
new EqualsTester()
.addEqualityGroup(original, copy)
.testEquals();
}
@Test
public final void test() {
testSerialized(new ConnectPoint(DID1, P1));
testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
testSerialized(new DefaultPort(DEV1, P1, true));
testSerialized(DID1);
testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
testSerialized(ImmutableMap.of(DID1, DEV1));
testSerialized(ImmutableMap.of());
testSerialized(ImmutableSet.of(DID1, DID2));
testSerialized(ImmutableSet.of(DID1));
testSerialized(ImmutableSet.of());
testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
testSerialized(new LinkKey(CP1, CP2));
testSerialized(new NodeId("SomeNodeIdentifier"));
testSerialized(P1);
testSerialized(PID);
}
}
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.onlab.graph.AdjacencyListsGraph;
import org.onlab.onos.net.topology.TopologyEdge;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.onlab.onos.net.DeviceId;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
......@@ -143,7 +143,7 @@ public class SimpleDeviceStore
}
@Override
public List<DeviceEvent> updatePorts(DeviceId deviceId,
public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
......@@ -221,7 +221,7 @@ public class SimpleDeviceStore
}
@Override
public DeviceEvent updatePortStatus(DeviceId deviceId,
public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
synchronized (this) {
Device device = devices.get(deviceId);
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
import static org.onlab.onos.net.host.HostEvent.Type.HOST_MOVED;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -7,8 +7,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
......@@ -48,8 +46,10 @@ public class SimpleMastershipStore
new DefaultControllerNode(new NodeId("local"), LOCALHOST);
//devices mapped to their masters, to emulate multiple nodes
protected final ConcurrentMap<DeviceId, NodeId> masterMap =
new ConcurrentHashMap<>();
protected final Map<DeviceId, NodeId> masterMap = new HashMap<>();
//emulate backups with pile of nodes
protected final Set<NodeId> backups = new HashSet<>();
//terms
protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
@Activate
......@@ -64,26 +64,30 @@ public class SimpleMastershipStore
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
NodeId node = masterMap.get(deviceId);
if (node == null) {
synchronized (this) {
switch (role) {
case MASTER:
return null;
case STANDBY:
masterMap.put(deviceId, nodeId);
termMap.get(deviceId).incrementAndGet();
backups.add(nodeId);
break;
case NONE:
masterMap.put(deviceId, nodeId);
termMap.put(deviceId, new AtomicInteger());
backups.add(nodeId);
break;
default:
log.warn("unknown Mastership Role {}", role);
return null;
}
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
if (node.equals(nodeId)) {
return null;
} else {
synchronized (this) {
masterMap.put(deviceId, nodeId);
termMap.get(deviceId).incrementAndGet();
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
}
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
......@@ -103,34 +107,111 @@ public class SimpleMastershipStore
@Override
public MastershipRole requestRole(DeviceId deviceId) {
return getRole(instance.id(), deviceId);
//query+possible reelection
NodeId node = instance.id();
MastershipRole role = getRole(node, deviceId);
switch (role) {
case MASTER:
break;
case STANDBY:
synchronized (this) {
//try to "re-elect", since we're really not distributed
NodeId rel = reelect(node);
if (rel == null) {
masterMap.put(deviceId, node);
termMap.put(deviceId, new AtomicInteger());
role = MastershipRole.MASTER;
}
backups.add(node);
}
break;
case NONE:
//first to get to it, say we are master
synchronized (this) {
masterMap.put(deviceId, node);
termMap.put(deviceId, new AtomicInteger());
backups.add(node);
role = MastershipRole.MASTER;
}
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return role;
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
NodeId node = masterMap.get(deviceId);
//just query
NodeId current = masterMap.get(deviceId);
MastershipRole role;
if (node != null) {
if (node.equals(nodeId)) {
role = MastershipRole.MASTER;
} else {
if (current == null) {
if (backups.contains(nodeId)) {
role = MastershipRole.STANDBY;
} else {
role = MastershipRole.NONE;
}
} else {
//masterMap doesn't contain it.
if (current.equals(nodeId)) {
role = MastershipRole.MASTER;
masterMap.put(deviceId, nodeId);
} else {
role = MastershipRole.STANDBY;
}
}
return role;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
if (masterMap.get(deviceId) == null) {
if ((masterMap.get(deviceId) == null) ||
(termMap.get(deviceId) == null)) {
return null;
}
return MastershipTerm.of(
masterMap.get(deviceId), termMap.get(deviceId).get());
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
MastershipRole role = getRole(nodeId, deviceId);
synchronized (this) {
switch (role) {
case MASTER:
NodeId backup = reelect(nodeId);
if (backup == null) {
masterMap.remove(deviceId);
} else {
masterMap.put(deviceId, backup);
termMap.get(deviceId).incrementAndGet();
return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
}
case STANDBY:
case NONE:
if (!termMap.containsKey(deviceId)) {
termMap.put(deviceId, new AtomicInteger());
}
backups.add(nodeId);
break;
default:
log.warn("unknown Mastership Role {}", role);
}
}
return null;
}
//dumbly selects next-available node that's not the current one
//emulate leader election
private NodeId reelect(NodeId nodeId) {
NodeId backup = null;
for (NodeId n : backups) {
if (!n.equals(nodeId)) {
backup = n;
break;
}
}
return backup;
}
}
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......
......@@ -2,4 +2,4 @@
* Implementations of in-memory stores suitable for unit testing and
* experimentation; not for production use.
*/
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import org.junit.Before;
import org.junit.Test;
......
/**
*
*/
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
......@@ -182,7 +182,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P2, true)
);
List<DeviceEvent> events = deviceStore.updatePorts(DID1, pds);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
......@@ -201,7 +201,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P3, true)
);
events = deviceStore.updatePorts(DID1, pds2);
events = deviceStore.updatePorts(PID, DID1, pds2);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -224,7 +224,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
events = deviceStore.updatePorts(DID1, pds3);
events = deviceStore.updatePorts(PID, DID1, pds3);
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -249,9 +249,9 @@ public class SimpleDeviceStoreTest {
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(DID1,
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
......@@ -267,7 +267,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
List<Port> ports = deviceStore.getPorts(DID1);
......@@ -290,7 +290,7 @@ public class SimpleDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, false)
);
deviceStore.updatePorts(DID1, pds);
deviceStore.updatePorts(PID, DID1, pds);
Port port1 = deviceStore.getPort(DID1, P1);
assertEquals(P1, port1.number());
......
package org.onlab.onos.net.trivial.impl;
package org.onlab.onos.store.trivial.impl;
import static org.junit.Assert.*;
import static org.onlab.onos.net.DeviceId.deviceId;
......
package org.onlab.onos.store.trivial.impl;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import com.google.common.collect.Sets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.onlab.onos.net.MastershipRole.*;
import static org.onlab.onos.cluster.MastershipEvent.Type.*;
/**
* Test for the simple MastershipStore implementation.
*/
public class SimpleMastershipStoreTest {
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
private static final DeviceId DID4 = DeviceId.deviceId("of:04");
private static final NodeId N1 = new NodeId("local");
private static final NodeId N2 = new NodeId("other");
private SimpleMastershipStore sms;
@Before
public void setUp() throws Exception {
sms = new SimpleMastershipStore();
sms.activate();
}
@After
public void tearDown() throws Exception {
sms.deactivate();
}
@Test
public void getRole() {
//special case, no backup or master
put(DID1, N1, false, false);
assertEquals("wrong role", NONE, sms.getRole(N1, DID1));
//backup exists but we aren't mapped
put(DID2, N1, false, true);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2));
//N2 is master
put(DID3, N2, true, true);
assertEquals("wrong role", MASTER, sms.getRole(N2, DID3));
//N2 is master but N1 is only in backups set
put(DID4, N2, true, false);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID4));
}
@Test
public void getMaster() {
put(DID3, N2, true, true);
assertEquals("wrong role", MASTER, sms.getRole(N2, DID3));
assertEquals("wrong device", N2, sms.getMaster(DID3));
}
@Test
public void setMaster() {
put(DID1, N1, false, false);
assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID1).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID1));
//set node that's already master - should be ignored
assertNull("wrong event", sms.setMaster(N1, DID1));
//set STANDBY to MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", STANDBY, sms.getRole(N1, DID2));
assertEquals("wrong event", MASTER_CHANGED, sms.setMaster(N1, DID2).type());
assertEquals("wrong role", MASTER, sms.getRole(N1, DID2));
}
@Test
public void getDevices() {
Set<DeviceId> d = Sets.newHashSet(DID1, DID2);
put(DID1, N2, true, true);
put(DID2, N2, true, true);
put(DID3, N1, true, true);
assertTrue("wrong devices", d.equals(sms.getDevices(N2)));
}
@Test
public void getTermFor() {
put(DID1, N1, true, true);
assertEquals("wrong term", MastershipTerm.of(N1, 0), sms.getTermFor(DID1));
//switch to N2 and back - 2 term switches
sms.setMaster(N2, DID1);
sms.setMaster(N1, DID1);
assertEquals("wrong term", MastershipTerm.of(N1, 2), sms.getTermFor(DID1));
}
@Test
public void requestRole() {
//NONE - become MASTER
put(DID1, N1, false, false);
assertEquals("wrong role", MASTER, sms.requestRole(DID1));
//STANDBY without backup - become MASTER
put(DID2, N1, false, true);
assertEquals("wrong role", MASTER, sms.requestRole(DID2));
//STANDBY with backup - stay STANDBY
put(DID3, N2, false, true);
assertEquals("wrong role", STANDBY, sms.requestRole(DID3));
//local (N1) is MASTER - stay MASTER
put(DID4, N1, true, true);
assertEquals("wrong role", MASTER, sms.requestRole(DID4));
}
@Test
public void unsetMaster() {
//NONE - record backup but take no other action
put(DID1, N1, false, false);
sms.unsetMaster(N1, DID1);
assertTrue("not backed up", sms.backups.contains(N1));
sms.termMap.clear();
sms.unsetMaster(N1, DID1);
assertTrue("term not set", sms.termMap.containsKey(DID1));
//no backup, MASTER
put(DID1, N1, true, true);
assertNull("wrong event", sms.unsetMaster(N1, DID1));
assertNull("wrong node", sms.masterMap.get(DID1));
//backup, switch
sms.masterMap.clear();
put(DID1, N1, true, true);
put(DID2, N2, true, true);
assertEquals("wrong event", MASTER_CHANGED, sms.unsetMaster(N1, DID1).type());
}
//helper to populate master/backup structures
private void put(DeviceId dev, NodeId node, boolean store, boolean backup) {
if (store) {
sms.masterMap.put(dev, node);
}
if (backup) {
sms.backups.add(node);
}
sms.termMap.put(dev, new AtomicInteger());
}
}
......@@ -419,7 +419,7 @@
<group>
<title>Core Subsystems</title>
<packages>
org.onlab.onos.cluster.impl:org.onlab.onos.net.device.impl:org.onlab.onos.net.link.impl:org.onlab.onos.net.host.impl:org.onlab.onos.net.topology.impl:org.onlab.onos.net.packet.impl:org.onlab.onos.net.flow.impl:org.onlab.onos.net.trivial.*:org.onlab.onos.net.*.impl:org.onlab.onos.event.impl:org.onlab.onos.store.*
org.onlab.onos.cluster.impl:org.onlab.onos.net.device.impl:org.onlab.onos.net.link.impl:org.onlab.onos.net.host.impl:org.onlab.onos.net.topology.impl:org.onlab.onos.net.packet.impl:org.onlab.onos.net.flow.impl:org.onlab.onos.store.trivial.*:org.onlab.onos.net.*.impl:org.onlab.onos.event.impl:org.onlab.onos.store.*
</packages>
</group>
<group>
......
......@@ -6,22 +6,21 @@
export ONOS_ROOT=${ONOS_ROOT:-~/onos-next}
# Setup some environmental context for developers
export JAVA_HOME=$(/usr/libexec/java_home)
export JAVA_HOME=$(/usr/libexec/java_home -v 1.7)
export MAVEN=${MAVEN:-~/Applications/apache-maven-3.2.2}
export KARAF=${KARAF:-~/Applications/apache-karaf-3.0.1}
export KARAF_LOG=$KARAF/data/log/karaf.log
# Setup a path
export PS=":"
export PATH="$PATH:$ONOS_ROOT/tools/dev:$ONOS_ROOT/tools/build"
export PATH="$PATH:$ONOS_ROOT/tools/test/bin"
export PATH="$PATH:$ONOS_ROOT/tools/dev/bin:$ONOS_ROOT/tools/test/bin"
export PATH="$PATH:$ONOS_ROOT/tools/build"
export PATH="$PATH:$MAVEN/bin:$KARAF/bin"
export PATH="$PATH:."
# Convenience utility to warp to various ONOS source projects
# e.g. 'o api', 'o dev', 'o'
function o {
cd $(find $ONOS_ROOT/ -type d | egrep -v '\.git|target|src' | \
cd $(find $ONOS_ROOT/ -type d | egrep -v '\.git|target' | \
egrep "${1:-$ONOS_ROOT}" | head -n 1)
}
......@@ -30,11 +29,12 @@ alias mci='mvn clean install'
# Short-hand for ONOS build, package and test.
alias ob='onos-build'
alias obs='onos-build-selective'
alias op='onos-package'
alias ot='onos-test'
# Short-hand for tailing the ONOS (karaf) log
alias tl='$ONOS_ROOT/tools/dev/watchLog'
alias tl='$ONOS_ROOT/tools/dev/bin/onos-local-log'
alias tlo='tl | grep --colour=always org.onlab'
# Pretty-print JSON output
......
#!/bin/bash
#------------------------------------------------------------------------------
# Selectively builds only those projects that contained modified Java files.
#------------------------------------------------------------------------------
projects=$(find $ONOS_ROOT -name '*.java' \
-not -path '*/openflowj/*' -and -not -path '.git/*' \
-exec $ONOS_ROOT/tools/dev/bin/onos-build-selective-hook {} \; | \
sort -u | sed "s:$ONOS_ROOT::g" | tr '\n' ',' | \
sed 's:/,:,:g;s:,/:,:g;s:^/::g;s:,$::g')
[ -n "$projects" ] && cd $ONOS_ROOT && mvn --projects $projects ${@:-clean install}
\ No newline at end of file
#------------------------------------------------------------------------------
# Echoes project-level directory if a Java file within is newer than its
# class file counterpart
#------------------------------------------------------------------------------
javaFile=${1#*\/src\/*\/java/}
basename=${1/*\//}
[ $basename = "package-info.java" ] && exit 0
src=${1/$javaFile/}
project=${src/src*/}
classFile=${javaFile/.java/.class}
[ ${project}target/classes/$classFile -nt ${src}$javaFile -o \
${project}target/test-classes/$classFile -nt ${src}$javaFile ] \
|| echo ${src/src*/}
unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC
unset OC1 OC2 OC3 OC4 OC5 OC6 OC7 OC8 OC9 OCN ONOS_NIC ONOS_FEATURES
......
......@@ -239,12 +239,41 @@ public final class KryoPool {
Kryo kryo = new Kryo();
kryo.setRegistrationRequired(registrationRequired);
for (Pair<Class<?>, Serializer<?>> registry : registeredTypes) {
if (registry.getRight() == null) {
final Serializer<?> serializer = registry.getRight();
if (serializer == null) {
kryo.register(registry.getLeft());
} else {
kryo.register(registry.getLeft(), registry.getRight());
kryo.register(registry.getLeft(), serializer);
if (serializer instanceof FamilySerializer) {
FamilySerializer<?> fser = (FamilySerializer<?>) serializer;
fser.registerFamilies(kryo);
}
}
}
return kryo;
}
/**
* Serializer implementation, which required registration of family of Classes.
* @param <T> base type of this serializer.
*/
public abstract static class FamilySerializer<T> extends Serializer<T> {
public FamilySerializer(boolean acceptsNull) {
super(acceptsNull);
}
public FamilySerializer(boolean acceptsNull, boolean immutable) {
super(acceptsNull, immutable);
}
/**
* Registers other classes this Serializer supports.
*
* @param kryo instance to register classes to
*/
public void registerFamilies(Kryo kryo) {
}
}
}
......