alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

package org.onlab.onos.net.intent;
import java.util.concurrent.Future;
import java.util.List;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
/**
* Abstraction of entity capable of installing intents to the environment.
......@@ -14,7 +14,7 @@ public interface IntentInstaller<T extends InstallableIntent> {
* @param intent intent to be installed
* @throws IntentException if issues are encountered while installing the intent
*/
Future<CompletedBatchOperation> install(T intent);
List<FlowRuleBatchOperation> install(T intent);
/**
* Uninstalls the specified intent from the environment.
......@@ -22,5 +22,5 @@ public interface IntentInstaller<T extends InstallableIntent> {
* @param intent intent to be uninstalled
* @throws IntentException if issues are encountered while uninstalling the intent
*/
Future<CompletedBatchOperation> uninstall(T intent);
List<FlowRuleBatchOperation> uninstall(T intent);
}
......
package org.onlab.onos.net.resource;
import java.util.Objects;
/**
* Representation of bandwidth resource.
*/
public final class Bandwidth extends LinkResource {
private final double bandwidth;
/**
* Creates a new instance with given bandwidth.
*
* @param bandwidth bandwidth value to be assigned
*/
private Bandwidth(double bandwidth) {
this.bandwidth = bandwidth;
}
/**
* Creates a new instance with given bandwidth.
*
* @param bandwidth bandwidth value to be assigned
* @return {@link Bandwidth} instance with given bandwidth
*/
public static Bandwidth valueOf(double bandwidth) {
return new Bandwidth(bandwidth);
}
/**
* Returns bandwidth as a double value.
*
* @return bandwidth as a double value
*/
public double toDouble() {
return bandwidth;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Bandwidth) {
Bandwidth that = (Bandwidth) obj;
return Objects.equals(this.bandwidth, that.bandwidth);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(this.bandwidth);
}
@Override
public String toString() {
return String.valueOf(this.bandwidth);
}
}
package org.onlab.onos.net.resource;
import java.util.Objects;
/**
* Representation of lambda resource.
*/
public final class Lambda extends LinkResource {
private final int lambda;
/**
* Creates a new instance with given lambda.
*
* @param lambda lambda value to be assigned
*/
private Lambda(int lambda) {
this.lambda = lambda;
}
/**
* Creates a new instance with given lambda.
*
* @param lambda lambda value to be assigned
* @return {@link Lambda} instance with given lambda
*/
public static Lambda valueOf(int lambda) {
return new Lambda(lambda);
}
/**
* Returns lambda as an int value.
* @return lambda as an int value
*/
public int toInt() {
return lambda;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Lambda) {
Lambda that = (Lambda) obj;
return Objects.equals(this.lambda, that.lambda);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(this.lambda);
}
@Override
public String toString() {
return String.valueOf(this.lambda);
}
}
package org.onlab.onos.net.resource;
/**
* Abstraction of link resource.
*/
public abstract class LinkResource {
}
package org.onlab.onos.net.resource;
import java.util.Map;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.PathIntent;
/**
* Service for providing link resource allocation.
*/
public interface LinkResourceService {
/**
* Allocates resources along the path.
* <p>
* Tries to allocate given resources on the links along the path specified
* by the given intent.
*
* @param res resources to be allocated
* @param intent an intent to be used for specifying the path
*/
void allocateResource(LinkResources res, PathIntent intent);
/**
* Releases resources along the path.
*
* @param intentId an ID for the intent for specifying the path
*/
void releaseResource(IntentId intentId);
/**
* Returns all allocated resources to each link.
*
* @return allocated resources to each link with {@link IntentId}
*/
Map<Link, Map<IntentId, LinkResources>> allocatedResources();
/**
* Returns all allocated resources to given link.
*
* @param link a target link
* @return allocated resources to the target link with {@link IntentId}
*/
Map<IntentId, LinkResources> allocatedResources(Link link);
/**
* Returns available resources for each link.
*
* @return available resources for each link
*/
Map<Link, LinkResources> availableResources();
/**
* Returns available resources for given link.
* @param link a target link
* @return available resources for the target link
*/
LinkResources availableResources(Link link);
}
package org.onlab.onos.net.resource;
import java.util.Set;
/**
* Abstraction of a resources of a link.
*/
public interface LinkResources {
/**
* Returns resources as a set of {@link LinkResource}s.
*
* @return a set of {@link LinkResource}s
*/
Set<LinkResource> resources();
/**
* Builder of {@link LinkResources}.
*/
public interface Builder {
/**
* Adds bandwidth resource.
* <p>
* This operation adds given bandwidth to previous bandwidth and
* generates single bandwidth resource.
*
* @param bandwidth bandwidth value to be added
* @return self
*/
public Builder addBandwidth(double bandwidth);
/**
* Adds lambda resource.
*
* @param lambda lambda value to be added
* @return self
*/
public Builder addLambda(int lambda);
/**
* Builds an immutable link resources.
*
* @return link resources
*/
public LinkResources build();
}
}
......@@ -14,12 +14,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
/**
* Suite of tests for the intent service contract.
......@@ -298,7 +297,7 @@ public class IntentServiceTest {
}
@Override
public Future<CompletedBatchOperation> install(TestInstallableIntent intent) {
public List<FlowRuleBatchOperation> install(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("install failed by design");
}
......@@ -306,7 +305,7 @@ public class IntentServiceTest {
}
@Override
public Future<CompletedBatchOperation> uninstall(TestInstallableIntent intent) {
public List<FlowRuleBatchOperation> uninstall(TestInstallableIntent intent) {
if (fail) {
throw new IntentException("remove failed by design");
}
......
......@@ -34,6 +34,8 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.intent.InstallableIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
......@@ -90,6 +92,9 @@ public class IntentManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Activate
public void activate() {
store.setDelegate(delegate);
......@@ -283,7 +288,7 @@ public class IntentManager
// Indicate that the intent is entering the installing phase.
store.setState(intent, INSTALLING);
List<Future<CompletedBatchOperation>> installFutures = Lists.newArrayList();
List<FlowRuleBatchOperation> installWork = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
......@@ -291,13 +296,13 @@ public class IntentManager
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(),
installable.requiredLinks());
Future<CompletedBatchOperation> future = getInstaller(installable).install(installable);
installFutures.add(future);
List<FlowRuleBatchOperation> batch = getInstaller(installable).install(installable);
installWork.addAll(batch);
}
}
// FIXME we have to wait for the installable intents
//eventDispatcher.post(store.setState(intent, INSTALLED));
monitorExecutor.execute(new IntentInstallMonitor(intent, installFutures, INSTALLED));
monitorExecutor.execute(new IntentInstallMonitor(intent, installWork, INSTALLED));
} catch (Exception e) {
log.warn("Unable to install intent {} due to: {}", intent.id(), e);
uninstallIntent(intent, RECOMPILING);
......@@ -369,16 +374,16 @@ public class IntentManager
* @param intent intent to be uninstalled
*/
private void uninstallIntent(Intent intent, IntentState nextState) {
List<Future<CompletedBatchOperation>> uninstallFutures = Lists.newArrayList();
List<FlowRuleBatchOperation> uninstallWork = Lists.newArrayList();
try {
List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
if (installables != null) {
for (InstallableIntent installable : installables) {
Future<CompletedBatchOperation> future = getInstaller(installable).uninstall(installable);
uninstallFutures.add(future);
List<FlowRuleBatchOperation> batches = getInstaller(installable).uninstall(installable);
uninstallWork.addAll(batches);
}
}
monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallFutures, nextState));
monitorExecutor.execute(new IntentInstallMonitor(intent, uninstallWork, nextState));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
......@@ -495,17 +500,27 @@ public class IntentManager
private class IntentInstallMonitor implements Runnable {
private final Intent intent;
private final List<FlowRuleBatchOperation> work;
private final List<Future<CompletedBatchOperation>> futures;
private final IntentState nextState;
public IntentInstallMonitor(Intent intent,
List<Future<CompletedBatchOperation>> futures, IntentState nextState) {
List<FlowRuleBatchOperation> work,
IntentState nextState) {
this.intent = intent;
this.futures = futures;
this.work = work;
// TODO how many Futures can be outstanding? one?
this.futures = Lists.newLinkedList();
this.nextState = nextState;
// TODO need to kick off the first batch sometime, why not now?
futures.add(applyNextBatch());
}
private void updateIntent(Intent intent) {
/**
* Update the intent store with the next status for this intent.
*/
private void updateIntent() {
if (nextState == RECOMPILING) {
executor.execute(new IntentTask(nextState, intent));
} else if (nextState == INSTALLED || nextState == WITHDRAWN) {
......@@ -515,22 +530,55 @@ public class IntentManager
}
}
@Override
public void run() {
/**
* Apply a list of FlowRules.
*
* @param rules rules to apply
*/
private Future<CompletedBatchOperation> applyNextBatch() {
if (work.isEmpty()) {
return null;
}
FlowRuleBatchOperation batch = work.remove(0);
return flowRuleService.applyBatch(batch);
}
/**
* Iterate through the pending futures, and remove them when they have completed.
*/
private void processFutures() {
List<Future<CompletedBatchOperation>> newFutures = Lists.newArrayList();
for (Iterator<Future<CompletedBatchOperation>> i = futures.iterator(); i.hasNext();) {
Future<CompletedBatchOperation> future = i.next();
try {
// TODO: we may want to get the future here and go back to the future.
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
if (completed.isSuccess()) {
Future<CompletedBatchOperation> newFuture = applyNextBatch();
if (newFuture != null) {
// we'll add this later so that we don't get a ConcurrentModException
newFutures.add(newFuture);
}
} else {
// TODO check if future succeeded and if not report fail items
log.warn("Failed items: {}", completed.failedItems());
// TODO revert....
//uninstallIntent(intent, RECOMPILING);
}
i.remove();
} catch (TimeoutException | InterruptedException | ExecutionException te) {
log.debug("Intallations of intent {} is still pending", intent);
}
}
futures.addAll(newFutures);
}
@Override
public void run() {
processFutures();
if (futures.isEmpty()) {
updateIntent(intent);
// woohoo! we are done!
updateIntent();
} else {
// resubmit ourselves if we are not done yet
monitorExecutor.submit(this);
......
......@@ -4,7 +4,6 @@ import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -16,14 +15,12 @@ import org.onlab.onos.CoreService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.IntentExtensionService;
......@@ -47,9 +44,6 @@ public class LinkCollectionIntentInstaller implements IntentInstaller<LinkCollec
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
......@@ -65,18 +59,8 @@ public class LinkCollectionIntentInstaller implements IntentInstaller<LinkCollec
intentManager.unregisterInstaller(PathIntent.class);
}
/**
* Apply a list of FlowRules.
*
* @param rules rules to apply
*/
private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
return flowRuleService.applyBatch(batch);
}
@Override
public Future<CompletedBatchOperation> install(LinkCollectionIntent intent) {
public List<FlowRuleBatchOperation> install(LinkCollectionIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
......@@ -92,11 +76,11 @@ public class LinkCollectionIntentInstaller implements IntentInstaller<LinkCollec
intent.egressPoint().deviceId(),
intent.egressPoint().port()));
return applyBatch(rules);
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
@Override
public Future<CompletedBatchOperation> uninstall(LinkCollectionIntent intent) {
public List<FlowRuleBatchOperation> uninstall(LinkCollectionIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
......@@ -113,7 +97,7 @@ public class LinkCollectionIntentInstaller implements IntentInstaller<LinkCollec
intent.egressPoint().deviceId(),
intent.egressPoint().port()));
return applyBatch(rules);
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
/**
......
......@@ -5,7 +5,6 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -16,14 +15,12 @@ import org.onlab.onos.ApplicationId;
import org.onlab.onos.CoreService;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.intent.IntentExtensionService;
......@@ -45,9 +42,6 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
protected IntentExtensionService intentManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private ApplicationId appId;
......@@ -63,31 +57,14 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
intentManager.unregisterInstaller(PathIntent.class);
}
/**
* Apply a list of FlowRules.
*
* @param rules rules to apply
*/
private Future<CompletedBatchOperation> applyBatch(List<FlowRuleBatchEntry> rules) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
Future<CompletedBatchOperation> future = flowRuleService.applyBatch(batch);
return future;
// try {
// //FIXME don't do this here
// future.get();
// } catch (InterruptedException | ExecutionException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
}
@Override
public Future<CompletedBatchOperation> install(PathIntent intent) {
public List<FlowRuleBatchOperation> install(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
// TODO Generate multiple batches
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
......@@ -100,18 +77,17 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
prev = link.dst();
}
return applyBatch(rules);
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
@Override
public Future<CompletedBatchOperation> uninstall(PathIntent intent) {
public List<FlowRuleBatchOperation> uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
// TODO Generate multiple batches
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
......@@ -123,7 +99,7 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
prev = link.dst();
}
return applyBatch(rules);
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
// TODO refactor below this line... ----------------------------
......
......@@ -22,10 +22,8 @@ import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.MessageSubjectSerializer;
import org.onlab.util.KryoNamespace;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
......
......@@ -17,6 +17,8 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
......@@ -41,7 +43,8 @@ public class NettyMessagingService implements MessagingService {
private final int port;
private final Endpoint localEp;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup;
private Class<? extends Channel> channelClass;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
......@@ -52,6 +55,17 @@ public class NettyMessagingService implements MessagingService {
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
// TODO: make this configurable.
private void initEventLoopGroup() {
try {
workerGroup = new EpollEventLoopGroup();
channelClass = EpollSocketChannel.class;
} catch (Throwable t) {
workerGroup = new NioEventLoopGroup();
channelClass = NioSocketChannel.class;
}
}
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
......@@ -71,6 +85,7 @@ public class NettyMessagingService implements MessagingService {
public void activate() throws Exception {
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
startAcceptingConnections();
}
......@@ -173,7 +188,7 @@ public class NettyMessagingService implements MessagingService {
bootstrap.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
bootstrap.channel(NioSocketChannel.class);
bootstrap.channel(channelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
......