tom

Merge remote-tracking branch 'origin/master'

Showing 74 changed files with 1199 additions and 215 deletions
......@@ -102,7 +102,7 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
}
sleep(1000);
//sleep(1000);
log.info("measuring async sender");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
......@@ -111,7 +111,7 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
context.stop();
}
sleep(1000);
sleep(10000);
}
public static void stop() {
......
package org.onlab.onos.cli;
import com.google.common.collect.Lists;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.DeviceId;
import java.util.Collections;
......
package org.onlab.onos.cli.net;
import java.util.HashSet;
import java.util.Set;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import org.onlab.packet.Ethernet;
/**
* Installs point-to-point connectivity intents.
*/
@Command(scope = "onos", name = "add-multi-to-single-intent",
description = "Installs point-to-point connectivity intent")
public class AddMultiPointToSinglePointIntentCommand extends AbstractShellCommand {
@Argument(index = 0, name = "ingressDevices",
description = "Ingress Device/Port Description",
required = true, multiValued = true)
String[] deviceStrings = null;
private static long id = 0x7070001;
@Override
protected void execute() {
IntentService service = get(IntentService.class);
if (deviceStrings.length < 2) {
return;
}
String egressDeviceString = deviceStrings[deviceStrings.length - 1];
DeviceId egressDeviceId = DeviceId.deviceId(getDeviceId(egressDeviceString));
PortNumber egressPortNumber =
PortNumber.portNumber(getPortNumber(egressDeviceString));
ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
Set<ConnectPoint> ingressPoints = new HashSet<>();
for (int index = 0; index < deviceStrings.length - 1; index++) {
String ingressDeviceString = deviceStrings[index];
DeviceId ingressDeviceId = DeviceId.deviceId(getDeviceId(ingressDeviceString));
PortNumber ingressPortNumber =
PortNumber.portNumber(getPortNumber(ingressDeviceString));
ConnectPoint ingress = new ConnectPoint(ingressDeviceId, ingressPortNumber);
ingressPoints.add(ingress);
}
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
Intent intent =
new MultiPointToSinglePointIntent(new IntentId(id++),
selector,
treatment,
ingressPoints,
egress);
service.submit(intent);
}
/**
* Extracts the port number portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return port number as a string, empty string if the port is not found
*/
private String getPortNumber(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(slash + 1, deviceString.length());
}
/**
* Extracts the device ID portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return device ID string
*/
private String getDeviceId(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(0, slash);
}
}
......@@ -3,8 +3,8 @@ package org.onlab.onos.cli.net;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipAdminService;
import org.onlab.onos.net.MastershipRole;
import static org.onlab.onos.net.DeviceId.deviceId;
......
package org.onlab.onos.cli.net;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentEvent.Type;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.PointToPointIntent;
import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
/**
* Installs point-to-point connectivity intents.
*/
@Command(scope = "onos", name = "push-test-intents",
description = "Installs random intents to test throughput")
public class IntentPushTestCommand extends AbstractShellCommand
implements IntentListener {
@Argument(index = 0, name = "ingressDevice",
description = "Ingress Device/Port Description",
required = true, multiValued = false)
String ingressDeviceString = null;
@Argument(index = 1, name = "egressDevice",
description = "Egress Device/Port Description",
required = true, multiValued = false)
String egressDeviceString = null;
@Argument(index = 2, name = "count",
description = "Number of intents to push",
required = true, multiValued = false)
String countString = null;
private static long id = 0x7870001;
private IntentService service;
private CountDownLatch latch;
private long start, end;
@Override
protected void execute() {
service = get(IntentService.class);
DeviceId ingressDeviceId = DeviceId.deviceId(getDeviceId(ingressDeviceString));
PortNumber ingressPortNumber =
PortNumber.portNumber(getPortNumber(ingressDeviceString));
ConnectPoint ingress = new ConnectPoint(ingressDeviceId, ingressPortNumber);
DeviceId egressDeviceId = DeviceId.deviceId(getDeviceId(egressDeviceString));
PortNumber egressPortNumber =
PortNumber.portNumber(getPortNumber(egressDeviceString));
ConnectPoint egress = new ConnectPoint(egressDeviceId, egressPortNumber);
TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4);
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
int count = Integer.parseInt(countString);
service.addListener(this);
latch = new CountDownLatch(count);
start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
TrafficSelector s = selector
.matchEthSrc(MacAddress.valueOf(i))
.build();
Intent intent =
new PointToPointIntent(new IntentId(id++),
s,
treatment,
ingress,
egress);
service.submit(intent);
}
try {
latch.await(5, TimeUnit.SECONDS);
printResults(count);
} catch (InterruptedException e) {
print(e.toString());
}
service.removeListener(this);
}
private void printResults(int count) {
long delta = end - start;
print("Time to install %d intents: %d ms", count, delta);
}
/**
* Extracts the port number portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return port number as a string, empty string if the port is not found
*/
private String getPortNumber(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(slash + 1, deviceString.length());
}
/**
* Extracts the device ID portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return device ID string
*/
private String getDeviceId(String deviceString) {
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return "";
}
return deviceString.substring(0, slash);
}
@Override
public void event(IntentEvent event) {
if (event.type() == Type.INSTALLED) {
end = event.time();
if (latch != null) {
latch.countDown();
} else {
log.warn("install event latch is null");
}
}
}
}
......@@ -82,6 +82,19 @@
<ref component-id="connectPointCompleter"/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.AddMultiPointToSinglePointIntentCommand"/>
<completers>
<ref component-id="connectPointCompleter"/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.IntentPushTestCommand"/>
<completers>
<ref component-id="connectPointCompleter"/>
<ref component-id="connectPointCompleter"/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.net.ClustersListCommand"/>
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractEvent;
import org.onlab.onos.net.DeviceId;
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import org.onlab.onos.event.EventListener;
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.Store;
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import java.util.Objects;
import org.onlab.onos.cluster.NodeId;
public final class MastershipTerm {
private final NodeId master;
......
/**
* Implementation of device store using Hazelcast distributed structures.
* Set of abstractions for dealing with controller mastership related topics.
*/
package org.onlab.onos.store.device.impl;
package org.onlab.onos.mastership;
......
package org.onlab.onos.store;
package org.onlab.onos.net.device;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
//TODO: Consider renaming to DeviceClockProviderService?
/**
* Interface for feeding term information to a logical clock service
* that vends per device timestamps.
*/
public interface ClockProviderService {
public interface DeviceClockProviderService {
/**
* Updates the mastership term for the specified deviceId.
......
package org.onlab.onos.store;
package org.onlab.onos.net.device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
// TODO: Consider renaming to DeviceClockService?
/**
* Interface for a logical clock service that vends per device timestamps.
*/
public interface ClockService {
public interface DeviceClockService {
/**
* Returns a new timestamp for the specified deviceId.
......
package org.onlab.onos.net.intent;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import com.google.common.base.MoreObjects;
/**
* Abstraction of a connectivity intent that is implemented by a set of path
* segments.
*/
public class LinkCollectionIntent extends ConnectivityIntent implements InstallableIntent {
private final Set<Link> links;
/**
* Creates a new point-to-point intent with the supplied ingress/egress
* ports and using the specified explicit path.
*
* @param id intent identifier
* @param selector traffic match
* @param treatment action
* @param links traversed links
* @throws NullPointerException {@code path} is null
*/
public LinkCollectionIntent(IntentId id,
TrafficSelector selector,
TrafficTreatment treatment,
Set<Link> links) {
super(id, selector, treatment);
this.links = links;
}
protected LinkCollectionIntent() {
super();
this.links = null;
}
@Override
public Collection<Link> requiredLinks() {
return links;
}
public Set<Link> links() {
return links;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LinkCollectionIntent that = (LinkCollectionIntent) o;
return Objects.equals(this.links, that.links);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), links);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", id())
.add("match", selector())
.add("action", treatment())
.add("links", links())
.toString();
}
}
......@@ -5,6 +5,7 @@ import java.util.Set;
import org.onlab.onos.cluster.NodeId;
// TODO: remove IOExceptions?
/**
* Service for assisting communications between controller cluster nodes.
*/
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
......
package org.onlab.onos.cluster;
package org.onlab.onos.mastership;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.onlab.onos.cluster.NodeId;
import com.google.common.testing.EqualsTester;
......
......@@ -4,6 +4,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -14,17 +15,18 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipAdminService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipStore;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.mastership.MastershipAdminService;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipListener;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.mastership.MastershipStore;
import org.onlab.onos.mastership.MastershipStoreDelegate;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.mastership.MastershipTermService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.slf4j.Logger;
......@@ -164,21 +166,68 @@ implements MastershipService, MastershipAdminService {
//callback for reacting to cluster events
private class InternalClusterEventListener implements ClusterEventListener {
// A notion of a local maximum cluster size, used to tie-break.
// Think of a better way to do this.
private AtomicInteger clusterSize;
InternalClusterEventListener() {
clusterSize = new AtomicInteger(0);
}
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
//FIXME: worry about addition when the time comes
case INSTANCE_ADDED:
case INSTANCE_ACTIVATED:
break;
clusterSize.incrementAndGet();
log.info("instance {} added/activated", event.subject());
break;
case INSTANCE_REMOVED:
case INSTANCE_DEACTIVATED:
ControllerNode node = event.subject();
if (node.equals(clusterService.getLocalNode())) {
//If we are in smaller cluster, relinquish and return
for (DeviceId device : getDevicesOf(node.id())) {
if (!isInMajority()) {
//own DeviceManager should catch event and tell switch
store.relinquishRole(node.id(), device);
}
}
log.info("broke off from cluster, relinquished devices");
break;
}
// if we are the larger one and the removed node(s) are brain dead,
// force relinquish on behalf of disabled node.
// check network channel to do this?
for (DeviceId device : getDevicesOf(node.id())) {
//some things to check:
// 1. we didn't break off as well while we're at it
// 2. others don't pile in and try too - maybe a lock
if (isInMajority()) {
store.relinquishRole(node.id(), device);
}
}
clusterSize.decrementAndGet();
log.info("instance {} removed/deactivated", event.subject());
break;
default:
log.warn("unknown cluster event {}", event);
}
}
private boolean isInMajority() {
if (clusterService.getNodes().size() > (clusterSize.intValue() / 2)) {
return true;
}
//else {
//FIXME: break tie for equal-sized clusters, can we use hz's functions?
// }
return false;
}
}
public class InternalDelegate implements MastershipStoreDelegate {
......
......@@ -13,20 +13,22 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipListener;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipListener;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.mastership.MastershipTermService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DeviceAdminService;
import org.onlab.onos.net.device.DeviceClockProviderService;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
......@@ -39,7 +41,6 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.onlab.onos.store.ClockProviderService;
import org.slf4j.Logger;
/**
......@@ -81,7 +82,7 @@ public class DeviceManager
protected MastershipTermService termService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockProviderService clockProviderService;
protected DeviceClockProviderService deviceClockProviderService;
@Activate
public void activate() {
......@@ -216,7 +217,7 @@ public class DeviceManager
return;
}
// tell clock provider if this instance is the master
clockProviderService.setMastershipTerm(deviceId, term);
deviceClockProviderService.setMastershipTerm(deviceId, term);
DeviceEvent event = store.createOrUpdateDevice(provider().id(),
deviceId, deviceDescription);
......@@ -257,12 +258,12 @@ public class DeviceManager
// temporarily request for Master Role and mark offline.
if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) {
log.debug("Device {} disconnected, but I am not the master", deviceId);
//let go of any role anyways
//let go of ability to be backup
mastershipService.relinquishMastership(deviceId);
return;
}
DeviceEvent event = store.markOffline(deviceId);
//we're no longer capable of being master or a candidate.
//relinquish master role and ability to be backup.
mastershipService.relinquishMastership(deviceId);
if (event != null) {
......@@ -325,23 +326,31 @@ public class DeviceManager
@Override
public void event(MastershipEvent event) {
final DeviceId did = event.subject();
if (isAvailable(did)) {
final NodeId myNodeId = clusterService.getLocalNode().id();
if (myNodeId.equals(event.master())) {
MastershipTerm term = termService.getMastershipTerm(did);
if (term.master().equals(myNodeId)) {
// only set the new term if I am the master
clockProviderService.setMastershipTerm(did, term);
}
applyRole(did, MastershipRole.MASTER);
} else {
applyRole(did, MastershipRole.STANDBY);
final NodeId myNodeId = clusterService.getLocalNode().id();
if (myNodeId.equals(event.master())) {
MastershipTerm term = termService.getMastershipTerm(did);
if (term.master().equals(myNodeId)) {
// only set the new term if I am the master
deviceClockProviderService.setMastershipTerm(did, term);
}
// FIXME: we should check that the device is connected on our end.
// currently, this is not straight forward as the actual switch
// implementation is hidden from the registry.
if (!isAvailable(did)) {
//flag the device as online. Is there a better way to do this?
Device device = getDevice(did);
store.createOrUpdateDevice(device.providerId(), did,
new DefaultDeviceDescription(
did.uri(), device.type(), device.manufacturer(),
device.hwVersion(), device.swVersion(),
device.serialNumber()));
}
applyRole(did, MastershipRole.MASTER);
} else {
//device dead to node, give up
mastershipService.relinquishMastership(did);
applyRole(did, MastershipRole.STANDBY);
}
}
......
package org.onlab.onos.net.intent.impl;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.LinkCollectionIntent;
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Installer for {@link org.onlab.onos.net.intent.LinkCollectionIntent}
* path segment intents.
*/
@Component(immediate = true)
public class LinkCollectionIntentInstaller implements IntentInstaller<LinkCollectionIntent> {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
@Activate
public void activate() {
appId = coreService.registerApplication("org.onlab.onos.net.intent");
intentManager.registerInstaller(LinkCollectionIntent.class, this);
}
@Deactivate
public void deactivate() {
intentManager.unregisterInstaller(PathIntent.class);
}
/**
* Apply a list of FlowRules.
*
* @param rules rules to apply
*/
private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
return flowRuleService.applyBatch(batch);
}
@Override
public Future<CompletedBatchOperation> install(LinkCollectionIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
for (Link link : intent.links()) {
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
}
return applyBatch(rules);
}
@Override
public Future<CompletedBatchOperation> uninstall(LinkCollectionIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
for (Link link : intent.links()) {
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
}
return applyBatch(rules);
}
}
package org.onlab.onos.net.intent.impl;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.intent.IdGenerator;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.LinkCollectionIntent;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
import org.onlab.onos.net.intent.PointToPointIntent;
import org.onlab.onos.net.topology.PathService;
/**
* An intent compiler for
* {@link org.onlab.onos.net.intent.MultiPointToSinglePointIntent}.
*/
@Component(immediate = true)
public class MultiPointToSinglePointIntentCompiler
implements IntentCompiler<MultiPointToSinglePointIntent> {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PathService pathService;
private IdGenerator<IntentId> intentIdGenerator;
@Activate
public void activate() {
IdBlockAllocator idBlockAllocator = new DummyIdBlockAllocator();
intentIdGenerator = new IdBlockAllocatorBasedIntentIdGenerator(idBlockAllocator);
intentManager.registerCompiler(MultiPointToSinglePointIntent.class, this);
}
@Deactivate
public void deactivate() {
intentManager.unregisterCompiler(PointToPointIntent.class);
}
@Override
public List<Intent> compile(MultiPointToSinglePointIntent intent) {
Set<Link> links = new HashSet<>();
for (ConnectPoint ingressPoint : intent.ingressPoints()) {
Path path = getPath(ingressPoint, intent.egressPoint());
links.addAll(path.links());
}
Intent result = new LinkCollectionIntent(intentIdGenerator.getNewId(),
intent.selector(), intent.treatment(),
links);
return Arrays.asList(result);
}
/**
* Computes a path between two ConnectPoints.
*
* @param one start of the path
* @param two end of the path
* @return Path between the two
* @throws org.onlab.onos.net.intent.impl.PathNotFoundException if a path cannot be found
*/
private Path getPath(ConnectPoint one, ConnectPoint two) {
Set<Path> paths = pathService.getPaths(one.deviceId(), two.deviceId());
if (paths.isEmpty()) {
throw new PathNotFoundException("No path from " + one + " to " + two);
}
// TODO: let's be more intelligent about this eventually
return paths.iterator().next();
}
}
......@@ -10,14 +10,16 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipService;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.mastership.MastershipTermService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.trivial.impl.SimpleMastershipStore;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.Sets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.onlab.onos.net.MastershipRole.*;
......@@ -143,7 +145,7 @@ public class MastershipManagerTest {
@Override
public Set<ControllerNode> getNodes() {
return null;
return Sets.newHashSet();
}
@Override
......
......@@ -10,13 +10,13 @@ import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipServiceAdapter;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.MastershipTermService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.mastership.MastershipServiceAdapter;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.mastership.MastershipTermService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
......@@ -25,6 +25,7 @@ import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.device.DeviceAdminService;
import org.onlab.onos.net.device.DeviceClockProviderService;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
......@@ -35,7 +36,6 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.store.trivial.impl.SimpleDeviceStore;
import org.onlab.packet.IpPrefix;
......@@ -88,7 +88,7 @@ public class DeviceManagerTest {
mgr.eventDispatcher = new TestEventDispatcher();
mgr.mastershipService = new TestMastershipService();
mgr.clusterService = new TestClusterService();
mgr.clockProviderService = new TestClockProviderService();
mgr.deviceClockProviderService = new TestClockProviderService();
mgr.activate();
service.addListener(listener);
......@@ -336,7 +336,7 @@ public class DeviceManagerTest {
}
private final class TestClockProviderService implements
ClockProviderService {
DeviceClockProviderService {
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
......
package org.onlab.onos.net.intent;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Test;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class TestLinkCollectionIntent {
private static class MockSelector implements TrafficSelector {
@Override
public Set<Criterion> criteria() {
return new HashSet<Criterion>();
}
}
private static class MockTreatment implements TrafficTreatment {
@Override
public List<Instruction> instructions() {
return new ArrayList<>();
}
}
@Test
public void testComparison() {
TrafficSelector selector = new MockSelector();
TrafficTreatment treatment = new MockTreatment();
Set<Link> links = new HashSet<>();
LinkCollectionIntent i1 = new LinkCollectionIntent(new IntentId(12),
selector, treatment, links);
LinkCollectionIntent i2 = new LinkCollectionIntent(new IntentId(12),
selector, treatment, links);
assertThat(i1.equals(i2), is(true));
}
}
......@@ -62,6 +62,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
package org.onlab.onos.store.cluster.messaging;
// FIXME: not used any more? remove
/**
* Service for encoding &amp; decoding intra-cluster message payload.
*/
......
......@@ -11,6 +11,7 @@ import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//FIXME: not used any more? remove
/**
* Factory for parsing messages sent between cluster members.
*/
......
/**
* Implementation of the cluster messaging mechanism.
*/
package org.onlab.onos.store.cluster.messaging.impl;
\ No newline at end of file
package org.onlab.onos.store.cluster.messaging.impl;
......
......@@ -10,7 +10,8 @@ import com.google.common.base.MoreObjects;
/**
* Wrapper class to store Timestamped value.
* @param <T>
*
* @param <T> Timestamped value type
*/
public final class Timestamped<T> {
......
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.device.impl;
import static com.google.common.base.Preconditions.checkNotNull;
......
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.device.impl;
import static com.google.common.base.Preconditions.checkNotNull;
......
......@@ -10,12 +10,12 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.net.device.DeviceClockProviderService;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import org.slf4j.Logger;
/**
......@@ -23,7 +23,7 @@ import org.slf4j.Logger;
*/
@Component(immediate = true)
@Service
public class DeviceClockManager implements ClockService, ClockProviderService {
public class DeviceClockManager implements DeviceClockService, DeviceClockProviderService {
private final Logger log = getLogger(getClass());
......
......@@ -58,7 +58,7 @@ class DeviceDescriptions {
*
* @param newDesc new DeviceDescription
*/
public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
public void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
Timestamped<DeviceDescription> oldOne = deviceDesc;
Timestamped<DeviceDescription> newOne = newDesc;
if (oldOne != null) {
......@@ -76,7 +76,7 @@ class DeviceDescriptions {
*
* @param newDesc new PortDescription
*/
public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
public void putPortDesc(Timestamped<PortDescription> newDesc) {
Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
Timestamped<PortDescription> newOne = newDesc;
if (oldOne != null) {
......
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.device.impl;
import java.util.Objects;
......@@ -51,4 +51,4 @@ public final class DeviceFragmentId {
this.providerId = null;
this.deviceId = null;
}
}
\ No newline at end of file
}
......
package org.onlab.onos.store.device.impl;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
......@@ -24,6 +25,7 @@ import org.onlab.onos.net.Device.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
......@@ -31,16 +33,12 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.device.impl.peermsg.DeviceAntiEntropyAdvertisement;
import org.onlab.onos.store.device.impl.peermsg.DeviceFragmentId;
import org.onlab.onos.store.device.impl.peermsg.PortFragmentId;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.util.KryoPool;
......@@ -110,7 +108,7 @@ public class GossipDeviceStore
private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -118,7 +116,7 @@ public class GossipDeviceStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
......@@ -206,14 +204,19 @@ public class GossipDeviceStore
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
final DeviceEvent event;
final Timestamped<DeviceDescription> mergedDesc;
synchronized (getDeviceDescriptions(deviceId)) {
event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
}
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
......@@ -317,8 +320,8 @@ public class GossipDeviceStore
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
DeviceEvent event = markOfflineInternal(deviceId, timestamp);
final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {}",
deviceId);
......@@ -390,17 +393,33 @@ public class GossipDeviceStore
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
Timestamped<List<PortDescription>> timestampedPortDescriptions =
new Timestamped<>(portDescriptions, newTimestamp);
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
final Timestamped<List<PortDescription>> timestampedInput
= new Timestamped<>(portDescriptions, newTimestamp);
final List<DeviceEvent> events;
final Timestamped<List<PortDescription>> merged;
synchronized (getDeviceDescriptions(deviceId)) {
events = updatePortsInternal(providerId, deviceId, timestampedInput);
final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
List<PortDescription> mergedList =
FluentIterable.from(portDescriptions)
.transform(new Function<PortDescription, PortDescription>() {
@Override
public PortDescription apply(PortDescription input) {
// lookup merged port description
return descs.getPortDesc(input.portNumber()).value();
}
}).toList();
merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
}
if (!events.isEmpty()) {
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
......@@ -527,16 +546,25 @@ public class GossipDeviceStore
}
@Override
public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
DeviceId deviceId,
PortDescription portDescription) {
final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
final Timestamped<PortDescription> deltaDesc
= new Timestamped<>(portDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<PortDescription> mergedDesc;
synchronized (getDeviceDescriptions(deviceId)) {
event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
.getPortDesc(portDescription.portNumber());
}
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
......@@ -615,7 +643,7 @@ public class GossipDeviceStore
@Override
public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device removed topology event for deviceId: {}",
......@@ -684,7 +712,7 @@ public class GossipDeviceStore
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
......
......@@ -20,4 +20,4 @@ public final class InitDeviceDescs
public DeviceDescriptions get() throws ConcurrentException {
return new DeviceDescriptions(deviceDesc);
}
}
\ No newline at end of file
}
......
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.device.impl;
import java.util.Objects;
......@@ -58,4 +58,4 @@ public final class PortFragmentId {
this.deviceId = null;
this.portNumber = null;
}
}
\ No newline at end of file
}
......
package org.onlab.onos.store.common.impl;
package org.onlab.onos.store.impl;
import static com.google.common.base.Preconditions.checkArgument;
......
......@@ -29,6 +29,7 @@ import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.Provided;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
......@@ -36,7 +37,6 @@ import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
......@@ -100,7 +100,7 @@ public class GossipLinkStore
private final Map<LinkKey, Timestamp> removedLinks = Maps.newHashMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
protected DeviceClockService deviceClockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -223,7 +223,7 @@ public class GossipLinkStore
LinkDescription linkDescription) {
DeviceId dstDeviceId = linkDescription.dst().deviceId();
Timestamp newTimestamp = clockService.getTimestamp(dstDeviceId);
Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
......@@ -344,7 +344,7 @@ public class GossipLinkStore
final LinkKey key = new LinkKey(src, dst);
DeviceId dstDeviceId = dst.deviceId();
Timestamp timestamp = clockService.getTimestamp(dstDeviceId);
Timestamp timestamp = deviceClockService.getTimestamp(dstDeviceId);
LinkEvent event = removeLinkInternal(key, timestamp);
......
......@@ -46,4 +46,4 @@ public class InternalLinkRemovedEvent {
linkKey = null;
timestamp = null;
}
}
\ No newline at end of file
}
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import org.onlab.util.KryoPool;
public final class DistributedStoreSerializers {
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
......
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.cluster.messaging.impl;
import org.junit.After;
import org.junit.Before;
......@@ -6,8 +6,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix;
......
......@@ -6,6 +6,7 @@ import java.nio.ByteBuffer;
import org.junit.Test;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import org.onlab.util.KryoPool;
import com.google.common.testing.EqualsTester;
......
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.device.impl;
import static org.onlab.onos.net.DeviceId.deviceId;
......
package org.onlab.onos.store.device.impl;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.onlab.onos.cluster.ControllerNode.State.*;
import static org.onlab.onos.net.DefaultAnnotations.union;
import static java.util.Arrays.asList;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -14,6 +19,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
......@@ -25,8 +31,8 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.Device;
......@@ -36,13 +42,13 @@ import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceStore;
import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
......@@ -90,14 +96,25 @@ public class GossipDeviceStoreTest {
.set("B4", "b4")
.build();
private static final NodeId MYSELF = new NodeId("myself");
// local node
private static final NodeId NID1 = new NodeId("local");
private static final ControllerNode ONOS1 =
new DefaultControllerNode(NID1, IpPrefix.valueOf("127.0.0.1"));
// remote node
private static final NodeId NID2 = new NodeId("remote");
private static final ControllerNode ONOS2 =
new DefaultControllerNode(NID2, IpPrefix.valueOf("127.0.0.2"));
private static final List<SparseAnnotations> NO_ANNOTATION = Collections.<SparseAnnotations>emptyList();
private TestGossipDeviceStore testGossipDeviceStore;
private GossipDeviceStore gossipDeviceStore;
private DeviceStore deviceStore;
private DeviceClockManager deviceClockManager;
private ClockService clockService;
private DeviceClockService deviceClockService;
private ClusterCommunicationService clusterCommunicator;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
......@@ -111,17 +128,24 @@ public class GossipDeviceStoreTest {
public void setUp() throws Exception {
deviceClockManager = new DeviceClockManager();
deviceClockManager.activate();
clockService = deviceClockManager;
deviceClockService = deviceClockManager;
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(NID1, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(NID1, 2));
ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
anyObject(ClusterMessageHandler.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();
gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
testGossipDeviceStore = new TestGossipDeviceStore(deviceClockService, clusterService, clusterCommunicator);
gossipDeviceStore = testGossipDeviceStore;
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
verify(clusterCommunicator);
reset(clusterCommunicator);
}
@After
......@@ -135,7 +159,16 @@ public class GossipDeviceStoreTest {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN, annotations);
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true).anyTimes();
} catch (IOException e) {
fail("Should never reach here");
}
replay(clusterCommunicator);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
verify(clusterCommunicator);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion,
......@@ -163,9 +196,9 @@ public class GossipDeviceStoreTest {
* @param annotations
*/
private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
DefaultAnnotations expected = DefaultAnnotations.builder().build();
SparseAnnotations expected = DefaultAnnotations.builder().build();
for (SparseAnnotations a : annotations) {
expected = DefaultAnnotations.merge(expected, a);
expected = DefaultAnnotations.union(expected, a);
}
assertEquals(expected.keys(), actual.keys());
for (String key : expected.keys()) {
......@@ -173,6 +206,36 @@ public class GossipDeviceStoreTest {
}
}
private static void assertDeviceDescriptionEquals(DeviceDescription expected,
DeviceDescription actual) {
if (expected == actual) {
return;
}
assertEquals(expected.deviceURI(), actual.deviceURI());
assertEquals(expected.hwVersion(), actual.hwVersion());
assertEquals(expected.manufacturer(), actual.manufacturer());
assertEquals(expected.serialNumber(), actual.serialNumber());
assertEquals(expected.swVersion(), actual.swVersion());
assertAnnotationsEquals(actual.annotations(), expected.annotations());
}
private static void assertDeviceDescriptionEquals(DeviceDescription expected,
List<SparseAnnotations> expectedAnnotations,
DeviceDescription actual) {
if (expected == actual) {
return;
}
assertEquals(expected.deviceURI(), actual.deviceURI());
assertEquals(expected.hwVersion(), actual.hwVersion());
assertEquals(expected.manufacturer(), actual.manufacturer());
assertEquals(expected.serialNumber(), actual.serialNumber());
assertEquals(expected.swVersion(), actual.swVersion());
assertAnnotationsEquals(actual.annotations(),
expectedAnnotations.toArray(new SparseAnnotations[0]));
}
@Test
public final void testGetDeviceCount() {
assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
......@@ -215,56 +278,123 @@ public class GossipDeviceStoreTest {
assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
}
private void assertInternalDeviceEvent(NodeId sender,
DeviceId deviceId,
ProviderId providerId,
DeviceDescription expectedDesc,
Capture<ClusterMessage> actualMsg) {
assertTrue(actualMsg.hasCaptured());
assertEquals(sender, actualMsg.getValue().sender());
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
actualMsg.getValue().subject());
InternalDeviceEvent addEvent
= testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
assertEquals(deviceId, addEvent.deviceId());
assertEquals(providerId, addEvent.providerId());
assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
}
private void assertInternalDeviceEvent(NodeId sender,
DeviceId deviceId,
ProviderId providerId,
DeviceDescription expectedDesc,
List<SparseAnnotations> expectedAnnotations,
Capture<ClusterMessage> actualMsg) {
assertTrue(actualMsg.hasCaptured());
assertEquals(sender, actualMsg.getValue().sender());
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
actualMsg.getValue().subject());
InternalDeviceEvent addEvent
= testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
assertEquals(deviceId, addEvent.deviceId());
assertEquals(providerId, addEvent.providerId());
assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
}
@Test
public final void testCreateOrUpdateDevice() {
public final void testCreateOrUpdateDevice() throws IOException {
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN);
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
reset(clusterCommunicator);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
}
@Test
public final void testCreateOrUpdateDeviceAncillary() {
public final void testCreateOrUpdateDeviceAncillary() throws IOException {
// add
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, A2);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(PIDA, event.subject().providerId());
assertAnnotationsEquals(event.subject().annotations(), A2);
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
// update from primary
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, A1);
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
assertEquals(PID, event2.subject().providerId());
assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
assertTrue(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
// no-op update from primary
resetCommunicatorExpectingNoBroadcast(bcast);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
verify(clusterCommunicator);
assertFalse("no broadcast expected", bcast.hasCaptured());
// For now, Ancillary is ignored once primary appears
resetCommunicatorExpectingNoBroadcast(bcast);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
verify(clusterCommunicator);
assertFalse("no broadcast expected", bcast.hasCaptured());
// But, Ancillary annotations will be in effect
DeviceDescription description3 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, A2_2);
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
assertEquals(DEVICE_UPDATED, event3.type());
// basic information will be the one from Primary
......@@ -273,6 +403,11 @@ public class GossipDeviceStoreTest {
// but annotation from Ancillary will be merged
assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
assertTrue(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
// note: only annotation from PIDA is sent over the wire
assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
asList(union(A2, A2_2)), bcast);
}
......@@ -282,14 +417,24 @@ public class GossipDeviceStoreTest {
putDevice(DID1, SW1);
assertTrue(deviceStore.isAvailable(DID1));
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.markOffline(DID1);
assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
assertDevice(DID1, SW1, event.subject());
assertFalse(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
resetCommunicatorExpectingNoBroadcast(bcast);
DeviceEvent event2 = deviceStore.markOffline(DID1);
assertNull("No change, no event", event2);
}
verify(clusterCommunicator);
assertFalse(bcast.hasCaptured());
}
@Test
public final void testUpdatePorts() {
......@@ -298,8 +443,13 @@ public class GossipDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
......@@ -318,7 +468,12 @@ public class GossipDeviceStoreTest {
new DefaultPortDescription(P3, true)
);
resetCommunicatorExpectingSingleBroadcast(bcast);
events = deviceStore.updatePorts(PID, DID1, pds2);
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -341,7 +496,12 @@ public class GossipDeviceStoreTest {
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
resetCommunicatorExpectingSingleBroadcast(bcast);
events = deviceStore.updatePorts(PID, DID1, pds3);
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -357,7 +517,6 @@ public class GossipDeviceStoreTest {
fail("Unknown port number encountered: " + num);
}
}
}
@Test
......@@ -368,16 +527,22 @@ public class GossipDeviceStoreTest {
);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
verify(clusterCommunicator);
assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
assertTrue(bcast.hasCaptured());
}
@Test
public final void testUpdatePortStatusAncillary() {
public final void testUpdatePortStatusAncillary() throws IOException {
putDeviceAncillary(DID1, SW1);
putDevice(DID1, SW1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
......@@ -385,36 +550,106 @@ public class GossipDeviceStoreTest {
);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false, A1_2));
Capture<ClusterMessage> bcast = new Capture<>();
// update port from primary
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
assertFalse("Port is disabled", event.port().isEnabled());
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P1, true));
verify(clusterCommunicator);
assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
assertTrue(bcast.hasCaptured());
// update port from ancillary with no attributes
resetCommunicatorExpectingNoBroadcast(bcast);
final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
assertNull("Ancillary is ignored if primary exists", event2);
verify(clusterCommunicator);
assertFalse(bcast.hasCaptured());
// but, Ancillary annotation update will be notified
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P1, true, A2));
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
assertEquals(PORT_UPDATED, event3.type());
assertDevice(DID1, SW1, event3.subject());
assertEquals(P1, event3.port().number());
assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
assertFalse("Port is disabled", event3.port().isEnabled());
verify(clusterCommunicator);
assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
assertTrue(bcast.hasCaptured());
// port only reported from Ancillary will be notified as down
DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P2, true));
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
assertEquals(PORT_ADDED, event4.type());
assertDevice(DID1, SW1, event4.subject());
assertEquals(P2, event4.port().number());
assertAnnotationsEquals(event4.port().annotations());
assertFalse("Port is disabled if not given from primary provider",
event4.port().isEnabled());
verify(clusterCommunicator);
// TODO: verify broadcast message content
assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
assertTrue(bcast.hasCaptured());
}
private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
ProviderId pid, DefaultPortDescription expectedDesc,
List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
assertTrue(actualMsg.hasCaptured());
assertEquals(sender, actualMsg.getValue().sender());
assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
actualMsg.getValue().subject());
InternalPortStatusEvent addEvent
= testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
assertEquals(did, addEvent.deviceId());
assertEquals(pid, addEvent.providerId());
assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
addEvent.portDescription().value());
}
private void assertPortDescriptionEquals(
PortDescription expectedDesc,
List<SparseAnnotations> expectedAnnotations,
PortDescription actual) {
assertEquals(expectedDesc.portNumber(), actual.portNumber());
assertEquals(expectedDesc.isEnabled(), actual.isEnabled());
assertAnnotationsEquals(actual.annotations(),
expectedAnnotations.toArray(new SparseAnnotations[0]));
}
private void resetCommunicatorExpectingNoBroadcast(
Capture<ClusterMessage> bcast) {
bcast.reset();
reset(clusterCommunicator);
replay(clusterCommunicator);
}
private void resetCommunicatorExpectingSingleBroadcast(
Capture<ClusterMessage> bcast) {
bcast.reset();
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
} catch (IOException e) {
fail("Should never reach here");
}
replay(clusterCommunicator);
}
@Test
......@@ -476,12 +711,19 @@ public class GossipDeviceStoreTest {
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(1, deviceStore.getDeviceCount());
assertEquals(0, deviceStore.getPorts(DID1).size());
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
......@@ -556,41 +798,35 @@ public class GossipDeviceStoreTest {
private static final class TestGossipDeviceStore extends GossipDeviceStore {
public TestGossipDeviceStore(
ClockService clockService,
DeviceClockService deviceClockService,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator) {
this.clockService = clockService;
this.deviceClockService = deviceClockService;
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
}
private static final class TestClusterCommunicationService implements ClusterCommunicationService {
@Override
public boolean broadcast(ClusterMessage message) throws IOException { return true; }
@Override
public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
@Override
public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
public <T> T deserialize(byte[] bytes) {
return SERIALIZER.decode(bytes);
}
}
private static final class TestClusterService implements ClusterService {
private static final ControllerNode ONOS1 =
new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
public TestClusterService() {
nodes.put(new NodeId("N1"), ONOS1);
nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
nodes.put(NID1, ONOS1);
nodeStates.put(NID1, ACTIVE);
nodes.put(NID2, ONOS2);
nodeStates.put(NID2, ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
return ONOS1;
return GossipDeviceStoreTest.ONOS1;
}
@Override
......
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.device.impl;
import static org.onlab.onos.net.DeviceId.deviceId;
......
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.mastership.impl;
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import static org.onlab.onos.mastership.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
import java.util.Set;
......@@ -12,11 +12,11 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipStore;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipStore;
import org.onlab.onos.mastership.MastershipStoreDelegate;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.common.AbstractHazelcastStore;
......
/**
* Structure and utilities used for inter-Node messaging.
* Implementation of a distributed mastership store using Hazelcast.
*/
package org.onlab.onos.store.device.impl.peermsg;
package org.onlab.onos.store.mastership.impl;
......
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.mastership.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
......@@ -21,11 +21,11 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipEvent.Type;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipStoreDelegate;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.mastership.MastershipEvent.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
......
......@@ -2,17 +2,17 @@ package org.onlab.onos.store.device.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.net.device.DeviceClockProviderService;
// FIXME: Code clone in onos-core-trivial, onos-core-hz-net
/**
* Dummy implementation of {@link ClockProviderService}.
* Dummy implementation of {@link DeviceClockProviderService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockProviderService implements ClockProviderService {
public class NoOpClockProviderService implements DeviceClockProviderService {
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
......
/**
* Implementation of flow store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.flow.impl;
/**
* Implementation of host store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.host.impl;
/**
* Implementation of link store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.link.impl;
/**
* Implementation of topology store using Hazelcast distributed structures.
*/
package org.onlab.onos.store.topology.impl;
package org.onlab.onos.store.serializers;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
/**
* Creates {@link ImmutableList} serializer instance.
*/
public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>> {
/**
* Creates {@link ImmutableList} serializer instance.
*/
public ImmutableListSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableList<?> object) {
output.writeInt(object.size());
for (Object e : object) {
kryo.writeClassAndObject(output, e);
}
}
@Override
public ImmutableList<?> read(Kryo kryo, Input input,
Class<ImmutableList<?>> type) {
final int size = input.readInt();
Builder<Object> builder = ImmutableList.builder();
for (int i = 0; i < size; ++i) {
builder.add(kryo.readClassAndObject(input));
}
return builder.build();
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableList.of(1).getClass(), this);
kryo.register(ImmutableList.of(1, 2).getClass(), this);
// TODO register required ImmutableList variants
}
}
......@@ -7,8 +7,8 @@ import java.util.HashMap;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
......@@ -31,6 +31,9 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
public final class KryoPoolUtil {
/**
......@@ -47,12 +50,15 @@ public final class KryoPoolUtil {
*/
public static final KryoPool API = KryoPool.newBuilder()
.register(MISC)
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(
//
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
//
//
ControllerNode.State.class,
Device.Type.class,
DefaultAnnotations.class,
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipTerm;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Kryo Serializer for {@link org.onlab.onos.cluster.MastershipTerm}.
* Kryo Serializer for {@link org.onlab.onos.mastership.MastershipTerm}.
*/
public class MastershipTermSerializer extends Serializer<MastershipTerm> {
......
......@@ -10,8 +10,8 @@ import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
......
......@@ -2,17 +2,17 @@ package org.onlab.onos.store.trivial.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.net.device.DeviceClockProviderService;
//FIXME: Code clone in onos-core-trivial, onos-core-hz-net
/**
* Dummy implementation of {@link ClockProviderService}.
* Dummy implementation of {@link DeviceClockProviderService}.
*/
@Component(immediate = true)
@Service
public class NoOpClockProviderService implements ClockProviderService {
public class NoOpClockProviderService implements DeviceClockProviderService {
@Override
public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
......
......@@ -15,18 +15,18 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipEvent;
import org.onlab.onos.cluster.MastershipStore;
import org.onlab.onos.cluster.MastershipStoreDelegate;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipStore;
import org.onlab.onos.mastership.MastershipStoreDelegate;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.AbstractStore;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import static org.onlab.onos.cluster.MastershipEvent.Type.*;
import static org.onlab.onos.mastership.MastershipEvent.Type.*;
/**
* Manages inventory of controller mastership over devices using
......
......@@ -6,8 +6,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import com.google.common.collect.Sets;
......@@ -15,8 +15,8 @@ import com.google.common.collect.Sets;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.onlab.onos.mastership.MastershipEvent.Type.*;
import static org.onlab.onos.net.MastershipRole.*;
import static org.onlab.onos.cluster.MastershipEvent.Type.*;
/**
* Test for the simple MastershipStore implementation.
......
......@@ -500,7 +500,7 @@
<group>
<title>Core Subsystems</title>
<packages>
org.onlab.onos.impl:org.onlab.onos.cluster.impl:org.onlab.onos.net.device.impl:org.onlab.onos.net.link.impl:org.onlab.onos.net.host.impl:org.onlab.onos.net.topology.impl:org.onlab.onos.net.packet.impl:org.onlab.onos.net.flow.impl:org.onlab.onos.store.trivial.*:org.onlab.onos.net.*.impl:org.onlab.onos.event.impl:org.onlab.onos.store.*:org.onlab.onos.net.intent.impl:org.onlab.onos.net.proxyarp.impl
org.onlab.onos.impl:org.onlab.onos.cluster.impl:org.onlab.onos.net.device.impl:org.onlab.onos.net.link.impl:org.onlab.onos.net.host.impl:org.onlab.onos.net.topology.impl:org.onlab.onos.net.packet.impl:org.onlab.onos.net.flow.impl:org.onlab.onos.store.trivial.*:org.onlab.onos.net.*.impl:org.onlab.onos.event.impl:org.onlab.onos.store.*:org.onlab.onos.net.intent.impl:org.onlab.onos.net.proxyarp.impl:org.onlab.onos.mastership.impl
</packages>
</group>
<group>
......
#!/bin/bash
#-------------------------------------------------------------------------------
# Update bundle on locally running karaf.
#-------------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
cd ~/.m2/repository
jar=$(find org/onlab -type f -name '*.jar' | grep -e $1 | grep -v -e -tests | head -n 1)
[ -z "$jar" ] && echo "No bundle $1 found for" && exit 1
bundle=$(echo $(basename $jar .jar) | sed 's/-[0-9].*//g')
client "bundle:update -f $bundle" 2>/dev/null