tom

Starting to experiment with flow tracking.

......@@ -6,6 +6,7 @@ import com.google.common.base.MoreObjects;
// TODO Consider renaming.
// it's an identifier for a Link, but it's not ElementId, so not using LinkId.
/**
* Immutable representation of a link identity.
*/
......@@ -43,6 +44,15 @@ public class LinkKey {
this.dst = dst;
}
/**
* Creates a link identifier for the specified link.
*
* @param link link descriptor
*/
public LinkKey(Link link) {
this(link.src(), link.dst());
}
@Override
public int hashCode() {
return Objects.hash(src(), dst);
......
package org.onlab.onos.net.intent;
import org.onlab.onos.net.Link;
import java.util.Collection;
/**
* Abstraction of an intent that can be installed into
* the underlying system without additional compilation.
*/
public interface InstallableIntent extends Intent {
/**
* Returns the collection of links that are required for this installable
* intent to exist.
*
* @return collection of links
*/
// FIXME: replace this with 'NetworkResource'
Collection<Link> requiredLinks();
}
......
package org.onlab.onos.net.intent;
import java.util.Objects;
import com.google.common.base.MoreObjects;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Path;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import com.google.common.base.MoreObjects;
import java.util.Collection;
import java.util.Objects;
/**
* Abstraction of explicitly path specified connectivity intent.
......@@ -86,4 +87,10 @@ public class PathIntent extends PointToPointIntent implements InstallableIntent
.add("path", path)
.toString();
}
@Override
public Collection<Link> requiredLinks() {
return path.links();
}
}
......
package org.onlab.onos.net.topology;
import org.onlab.onos.event.AbstractEvent;
import org.onlab.onos.event.Event;
import java.util.List;
/**
* Describes network topology event.
*/
public class TopologyEvent extends AbstractEvent<TopologyEvent.Type, Topology> {
private final List<Event> reasons;
/**
* Type of topology events.
*/
......@@ -23,9 +28,11 @@ public class TopologyEvent extends AbstractEvent<TopologyEvent.Type, Topology> {
*
* @param type topology event type
* @param topology event topology subject
* @param reasons list of events that triggered topology change
*/
public TopologyEvent(Type type, Topology topology) {
public TopologyEvent(Type type, Topology topology, List<Event> reasons) {
super(type, topology);
this.reasons = reasons;
}
/**
......@@ -33,10 +40,24 @@ public class TopologyEvent extends AbstractEvent<TopologyEvent.Type, Topology> {
*
* @param type link event type
* @param topology event topology subject
* @param reasons list of events that triggered topology change
* @param time occurrence time
*/
public TopologyEvent(Type type, Topology topology, long time) {
public TopologyEvent(Type type, Topology topology, List<Event> reasons,
long time) {
super(type, topology, time);
this.reasons = reasons;
}
/**
* Returns the list of events that triggered the topology change.
*
* @return list of events responsible for change in topology; null if
* initial topology computation
*/
public List<Event> reasons() {
return reasons;
}
}
......
package org.onlab.onos.net.intent;
//TODO is this the right package?
import org.onlab.onos.net.Link;
import java.util.Collection;
/**
* An installable intent used in the unit test.
*
......@@ -25,4 +29,8 @@ public class TestInstallableIntent extends AbstractIntent implements Installable
super();
}
@Override
public Collection<Link> requiredLinks() {
return null;
}
}
......
package org.onlab.onos.net.intent.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.Event;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.topology.TopologyEvent;
import org.onlab.onos.net.topology.TopologyListener;
import org.onlab.onos.net.topology.TopologyService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Entity responsible for tracking installed flows and for monitoring topology
* events to determine what flows are affected by topology changes.
*/
@Component
@Service
public class FlowTracker implements FlowTrackerService {
private final Logger log = getLogger(getClass());
private final SetMultimap<LinkKey, IntentId> intentsByLink =
synchronizedSetMultimap(HashMultimap.<LinkKey, IntentId>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
private ExecutorService executorService =
newSingleThreadExecutor(namedThreads("onos-flowtracker"));
private TopologyListener listener = new InternalTopologyListener();
private TopologyChangeDelegate delegate;
@Activate
public void activate() {
topologyService.addListener(listener);
log.info("Started");
}
@Deactivate
public void deactivate() {
topologyService.removeListener(listener);
log.info("Stopped");
}
@Override
public void setDelegate(TopologyChangeDelegate delegate) {
checkNotNull(delegate, "Delegate cannot be null");
checkArgument(this.delegate == null || this.delegate == delegate,
"Another delegate already set");
this.delegate = delegate;
}
@Override
public void unsetDelegate(TopologyChangeDelegate delegate) {
checkArgument(this.delegate == delegate, "Not the current delegate");
this.delegate = null;
}
@Override
public void addTrackedResources(IntentId intentId, Collection<Link> resources) {
for (Link link : resources) {
intentsByLink.put(new LinkKey(link), intentId);
}
}
@Override
public void removeTrackedResources(IntentId intentId, Collection<Link> resources) {
for (Link link : resources) {
intentsByLink.remove(new LinkKey(link), intentId);
}
}
// Internal re-actor to topology change events.
private class InternalTopologyListener implements TopologyListener {
@Override
public void event(TopologyEvent event) {
executorService.execute(new TopologyChangeHandler(event));
}
}
// Re-dispatcher of topology change events.
private class TopologyChangeHandler implements Runnable {
private final TopologyEvent event;
TopologyChangeHandler(TopologyEvent event) {
this.event = event;
}
@Override
public void run() {
if (event.reasons() == null) {
delegate.bumpIntents(intentsByLink.values());
} else {
for (Event reason : event.reasons()) {
if (reason instanceof LinkEvent) {
LinkEvent linkEvent = (LinkEvent) reason;
if (linkEvent.type() == LinkEvent.Type.LINK_ADDED ||
linkEvent.type() == LinkEvent.Type.LINK_UPDATED) {
delegate.bumpIntents(intentsByLink.get(new LinkKey(linkEvent.subject())));
} else if (linkEvent.type() == LinkEvent.Type.LINK_REMOVED) {
delegate.failIntents(intentsByLink.get(new LinkKey(linkEvent.subject())));
}
}
}
}
}
}
}
package org.onlab.onos.net.intent.impl;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.intent.IntentId;
import java.util.Collection;
/**
* Auxiliary service for tracking intent path flows and for notifying the
* intent service of environment changes via topology change delegate.
*/
public interface FlowTrackerService {
/**
* Sets a topology change delegate.
*
* @param delegate topology change delegate
*/
void setDelegate(TopologyChangeDelegate delegate);
/**
* Unsets topology change delegate.
*
* @param delegate topology change delegate
*/
void unsetDelegate(TopologyChangeDelegate delegate);
/**
* Adds a path flow to be tracked.
*
* @param intentId intent identity on whose behalf the path is being tracked
* @param resources resources to track
*/
public void addTrackedResources(IntentId intentId, Collection<Link> resources);
/**
* Removes a path flow to be tracked.
*
* @param intentId intent identity on whose behalf the path is being tracked
* @param resources resources to stop tracking
*/
public void removeTrackedResources(IntentId intentId, Collection<Link> resources);
}
package org.onlab.onos.net.intent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.onos.net.intent.IntentState.FAILED;
import static org.onlab.onos.net.intent.IntentState.INSTALLED;
import static org.onlab.onos.net.intent.IntentState.WITHDRAWING;
import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.collect.ImmutableMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -38,7 +25,15 @@ import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* An implementation of Intent Manager.
......@@ -57,37 +52,34 @@ public class IntentManager
IntentCompiler<? extends Intent>> compilers = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<? extends InstallableIntent>,
IntentInstaller<? extends InstallableIntent>> installers = new ConcurrentHashMap<>();
private final CopyOnWriteArrayList<IntentListener> listeners = new CopyOnWriteArrayList<>();
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowTrackerService trackerService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
@Activate
public void activate() {
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
// this.intentEvents = new IntentMap<>("intentState", IntentEvent.class, collectionsService);
// this.installableIntents =
// new IntentMap<>("installableIntents", IntentCompilationResult.class, collectionsService);
//
//
// this.intentEvents.addListener(new InternalEntryListener(new InternalIntentEventListener()));
log.info("Started");
}
@Deactivate
public void deactivate() {
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
log.info("Stopped");
}
......@@ -97,7 +89,6 @@ public class IntentManager
checkNotNull(intent, INTENT_NULL);
registerSubclassCompilerIfNeeded(intent);
IntentEvent event = store.createIntent(intent);
eventDispatcher.post(event);
processStoreEvent(event);
}
......@@ -105,7 +96,13 @@ public class IntentManager
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
IntentEvent event = store.setState(intent, WITHDRAWING);
eventDispatcher.post(event);
List<InstallableIntent> installables = store.getInstallableIntents(intent.getId());
if (installables != null) {
for (InstallableIntent installable : installables) {
trackerService.removeTrackedResources(intent.getId(),
installable.requiredLinks());
}
}
processStoreEvent(event);
}
......@@ -178,17 +175,6 @@ public class IntentManager
}
/**
* Invokes all of registered intent event listener.
*
* @param event event supplied to a listener as an argument
*/
private void invokeListeners(IntentEvent event) {
for (IntentListener listener : listeners) {
listener.event(event);
}
}
/**
* Returns the corresponding intent compiler to the specified intent.
*
* @param intent intent
......@@ -206,6 +192,7 @@ public class IntentManager
/**
* Returns the corresponding intent installer to the specified installable intent.
*
* @param intent intent
* @param <T> the type of installable intent
* @return intent installer corresponding to the specified installable intent
......@@ -229,10 +216,12 @@ public class IntentManager
// TODO: implement compilation traversing tree structure
List<InstallableIntent> installable = new ArrayList<>();
for (Intent compiled : getCompiler(intent).compile(intent)) {
installable.add((InstallableIntent) compiled);
InstallableIntent installableIntent = (InstallableIntent) compiled;
installable.add(installableIntent);
trackerService.addTrackedResources(intent.getId(),
installableIntent.requiredLinks());
}
IntentEvent event = store.addInstallableIntents(intent.getId(), installable);
eventDispatcher.post(event);
processStoreEvent(event);
}
......@@ -242,13 +231,14 @@ public class IntentManager
* @param intent intent
*/
private void installIntent(Intent intent) {
for (InstallableIntent installable : store.getInstallableIntents(intent.getId())) {
List<InstallableIntent> installables = store.getInstallableIntents(intent.getId());
if (installables != null) {
for (InstallableIntent installable : installables) {
registerSubclassInstallerIfNeeded(installable);
getInstaller(installable).install(installable);
}
}
IntentEvent event = store.setState(intent, INSTALLED);
eventDispatcher.post(event);
processStoreEvent(event);
}
......@@ -259,10 +249,12 @@ public class IntentManager
* @param intent intent
*/
private void uninstallIntent(Intent intent) {
for (InstallableIntent installable : store.getInstallableIntents(intent.getId())) {
List<InstallableIntent> installables = store.getInstallableIntents(intent.getId());
if (installables != null) {
for (InstallableIntent installable : installables) {
getInstaller(installable).uninstall(installable);
}
}
store.removeInstalledIntents(intent.getId());
store.setState(intent, WITHDRAWN);
}
......@@ -321,9 +313,8 @@ public class IntentManager
* Handles state transition of submitted intents.
*/
private void processStoreEvent(IntentEvent event) {
invokeListeners(event);
eventDispatcher.post(event);
Intent intent = event.getIntent();
try {
switch (event.getState()) {
case SUBMITTED:
......@@ -342,8 +333,8 @@ public class IntentManager
case FAILED:
break;
default:
throw new IllegalStateException(
"the state of IntentEvent is illegal: " + event.getState());
throw new IllegalStateException("the state of IntentEvent is illegal: " +
event.getState());
}
} catch (IntentException e) {
store.setState(intent, FAILED);
......@@ -355,9 +346,26 @@ public class IntentManager
private class InternalStoreDelegate implements IntentStoreDelegate {
@Override
public void notify(IntentEvent event) {
eventDispatcher.post(event);
processStoreEvent(event);
}
}
// Topology change delegate
private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
@Override
public void bumpIntents(Iterable<IntentId> intentIds) {
for (IntentId intentId : intentIds) {
compileIntent(getIntent(intentId));
}
}
@Override
public void failIntents(Iterable<IntentId> intentIds) {
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
uninstallIntent(intent);
compileIntent(intent);
}
}
}
}
......
package org.onlab.onos.net.intent.impl;
import org.onlab.onos.net.intent.IntentId;
/**
* Auxiliary delegate for integration of intent manager and flow trackerService.
*/
public interface TopologyChangeDelegate {
/**
* Notifies that topology has changed in such a way that the specified
* intents should be recompiled.
*
* @param intentIds intents that should be recompiled
*/
void bumpIntents(Iterable<IntentId> intentIds);
/**
* Notifies that topology has changed in such a way that the specified
* intents should be marked failed and then recompiled.
*
* @param intentIds intents that should be failed and recompiled
*/
void failIntents(Iterable<IntentId> intentIds);
}
......@@ -125,7 +125,8 @@ implements TopologyStore {
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
current, reasons);
}
}
......
......@@ -125,7 +125,8 @@ implements TopologyStore {
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
current, reasons);
}
}
......
......@@ -124,7 +124,8 @@ public class SimpleTopologyStore
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
current, reasons);
}
}
......
......@@ -15,7 +15,7 @@ name=${2:-onos-1}
ssh $remote "
sudo perl -pi.bak -e \"s/127.0.1.1.*/127.0.1.1 $name/g\" /etc/hosts
sudo perl -pi.bak -e \"s/.*/$name/g\" /etc/hostname
sudo perl -pi.bak -e \"local \$/ = ''; s/.*/$name/g\" /etc/hostname
sudo hostname $name
" 2>/dev/null
......