alshabib
Committed by Gerrit Code Review

Flow Objective implementation

Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.

Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
Showing 26 changed files with 1109 additions and 187 deletions
......@@ -20,7 +20,6 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -28,35 +27,29 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip4Prefix;
import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.config.NetworkConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.packet.PacketService;
import org.onosproject.routing.FibEntry;
......@@ -74,7 +67,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.onlab.util.Tools.delay;
/**
* BgpRouter component.
......@@ -124,7 +118,7 @@ public class BgpRouter {
private final Map<IpPrefix, IpAddress> prefixToNextHop = Maps.newHashMap();
// Mapping from next hop IP to next hop object containing group info
private final Map<IpAddress, NextHop> nextHops = Maps.newHashMap();
private final Map<IpAddress, Integer> nextHops = Maps.newHashMap();
// Stores FIB updates that are waiting for groups to be set up
private final Multimap<NextHopGroupKey, FibEntry> pendingUpdates = HashMultimap.create();
......@@ -136,7 +130,7 @@ public class BgpRouter {
// learned from config
private DeviceId ctrlDeviceId;
private final GroupListener groupListener = new InternalGroupListener();
//private final GroupListener groupListener = new InternalGroupListener();
private TunnellingConnectivityManager connectivityManager;
......@@ -160,7 +154,7 @@ public class BgpRouter {
appId = coreService.registerApplication(BGP_ROUTER_APP);
getDeviceConfiguration(configService.getBgpSpeakers());
groupService.addListener(groupListener);
//groupService.addListener(groupListener);
processIntfFilters(true, configService.getInterfaces());
......@@ -179,6 +173,14 @@ public class BgpRouter {
icmpHandler.start();
log.info("BgpRouter started");
delay(1000);
FibEntry fibEntry = new FibEntry(Ip4Prefix.valueOf("10.1.0.0/16"),
Ip4Address.valueOf("192.168.10.1"),
MacAddress.valueOf("DE:AD:BE:EF:FE:ED"));
FibUpdate fibUpdate = new FibUpdate(FibUpdate.Type.UPDATE, fibEntry);
updateFibEntry(Collections.singletonList(fibUpdate));
}
@Deactivate
......@@ -188,7 +190,7 @@ public class BgpRouter {
icmpHandler.stop();
processIntfFilters(false, configService.getInterfaces());
groupService.removeListener(groupListener);
//groupService.removeListener(groupListener);
log.info("BgpRouter stopped");
}
......@@ -213,16 +215,18 @@ public class BgpRouter {
}
private void updateFibEntry(Collection<FibUpdate> updates) {
Map<FibEntry, Group> toInstall = new HashMap<>(updates.size());
Map<FibEntry, Integer> toInstall = new HashMap<>(updates.size());
for (FibUpdate update : updates) {
FibEntry entry = update.entry();
addNextHop(entry);
Group group;
Integer nextId;
synchronized (pendingUpdates) {
NextHop nextHop = nextHops.get(entry.nextHopIp());
nextId = nextHops.get(entry.nextHopIp());
/*
group = groupService.getGroup(deviceId,
new DefaultGroupKey(
appKryo.serialize(nextHop.group())));
......@@ -231,66 +235,70 @@ public class BgpRouter {
log.debug("Adding pending flow {}", update.entry());
pendingUpdates.put(nextHop.group(), update.entry());
continue;
}
}*/
}
toInstall.put(update.entry(), group);
toInstall.put(update.entry(), nextId);
}
installFlows(toInstall);
}
private void installFlows(Map<FibEntry, Group> entriesToInstall) {
FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
private void installFlows(Map<FibEntry, Integer> entriesToInstall) {
for (Map.Entry<FibEntry, Group> entry : entriesToInstall.entrySet()) {
for (Map.Entry<FibEntry, Integer> entry : entriesToInstall.entrySet()) {
FibEntry fibEntry = entry.getKey();
Group group = entry.getValue();
Integer nextId = entry.getValue();
flowObjectiveService.forward(deviceId,
generateRibFlowRule(fibEntry.prefix(), nextId).add());
FlowRule flowRule = generateRibFlowRule(fibEntry.prefix(), group);
builder.add(flowRule);
}
log.info("Sending flow forwarding objective");
flowService.apply(builder.build());
}
private synchronized void deleteFibEntry(Collection<FibUpdate> withdraws) {
FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (FibUpdate update : withdraws) {
FibEntry entry = update.entry();
Integer nextId = nextHops.get(entry.nextHopIp());
Group group = deleteNextHop(entry.prefix());
/*Group group = deleteNextHop(entry.prefix());
if (group == null) {
log.warn("Group not found when deleting {}", entry);
return;
}
}*/
FlowRule flowRule = generateRibFlowRule(entry.prefix(), group);
flowObjectiveService.forward(deviceId,
generateRibFlowRule(entry.prefix(), nextId).remove());
builder.remove(flowRule);
}
flowService.apply(builder.build());
}
private FlowRule generateRibFlowRule(IpPrefix prefix, Group group) {
private ForwardingObjective.Builder generateRibFlowRule(IpPrefix prefix, Integer nextId) {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(prefix)
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.group(group.id())
.build();
int priority = prefix.prefixLength() * PRIORITY_MULTIPLIER + PRIORITY_OFFSET;
return new DefaultFlowRule(deviceId, selector, treatment,
priority, appId, 0, true,
FlowRule.Type.IP);
ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder()
.fromApp(appId)
.makePermanent()
.nextStep(nextId)
.withSelector(selector)
.withPriority(priority)
.withFlag(ForwardingObjective.Flag.SPECIFIC);
return fwdBuilder;
}
private synchronized void addNextHop(FibEntry entry) {
......@@ -317,6 +325,16 @@ public class BgpRouter {
.setOutput(egressIntf.connectPoint().port())
.build();
NextObjective nextObjective = DefaultNextObjective.builder()
.withId(entry.hashCode())
.addTreatment(treatment)
.withType(NextObjective.Type.SIMPLE)
.fromApp(appId)
.add();
flowObjectiveService.next(deviceId, nextObjective);
/*
GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(treatment);
GroupDescription groupDescription
......@@ -328,15 +346,16 @@ public class BgpRouter {
appId);
groupService.addGroup(groupDescription);
*/
nextHops.put(nextHop.ip(), nextHop);
nextHops.put(nextHop.ip(), entry.hashCode());
}
nextHopsCount.add(entry.nextHopIp());
}
private synchronized Group deleteNextHop(IpPrefix prefix) {
/*private synchronized Group deleteNextHop(IpPrefix prefix) {
IpAddress nextHopIp = prefixToNextHop.remove(prefix);
NextHop nextHop = nextHops.get(nextHopIp);
if (nextHop == null) {
......@@ -349,7 +368,7 @@ public class BgpRouter {
serialize(nextHop.group())));
// FIXME disabling group deletes for now until we verify the logic is OK
/*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
*//*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
// There was one or less next hops, so there are now none
log.debug("removing group for next hop {}", nextHop);
......@@ -359,10 +378,10 @@ public class BgpRouter {
groupService.removeGroup(deviceId,
new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
appId);
}*/
}*//*
return group;
}
}*/
private class InternalFibListener implements FibListener {
......@@ -385,12 +404,11 @@ public class BgpRouter {
.forEach(ipaddr -> fob.addCondition(
Criteria.matchIPDst(ipaddr.subnetAddress())));
fob.permit().fromApp(appId);
flowObjectiveService.filter(deviceId,
Collections.singletonList(fob.add()));
flowObjectiveService.filter(deviceId, fob.add());
}
}
private class InternalGroupListener implements GroupListener {
/* private class InternalGroupListener implements GroupListener {
@Override
public void event(GroupEvent event) {
......@@ -412,5 +430,5 @@ public class BgpRouter {
}
}
}
}
}*/
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.behaviour;
/**
* Default implementation of a next group.
*/
public class DefaultNextGroup implements NextGroup {
private final byte[] data;
public DefaultNextGroup(byte[] data) {
this.data = data;
}
@Override
public byte[] data() {
return data;
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.behaviour;
/**
* Opaque data type for carrying group-like information.
* Only relevant to a pipeliner driver.
*/
public interface NextGroup {
/**
* Serialized form of the next group.
* @return a byte array.
*/
byte[] data();
}
......@@ -21,9 +21,6 @@ import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import java.util.Collection;
import java.util.concurrent.Future;
/**
* Behaviour for handling various pipelines.
*/
......@@ -40,24 +37,21 @@ public interface Pipeliner extends HandlerBehaviour {
/**
* Installs the filtering rules onto the device.
*
* @param filterObjectives the collection of filters
* @return a future indicating the success of the operation
* @param filterObjective a filtering objective
*/
Future<Boolean> filter(Collection<FilteringObjective> filterObjectives);
void filter(FilteringObjective filterObjective);
/**
* Installs the forwarding rules onto the device.
*
* @param forwardObjectives the collection of forwarding objectives
* @return a future indicating the success of the operation
* @param forwardObjective a forwarding objective
*/
Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives);
void forward(ForwardingObjective forwardObjective);
/**
* Installs the next hop elements into the device.
*
* @param nextObjectives the collection of next objectives
* @return a future indicating the success of the operation
* @param nextObjective a next objectives
*/
Future<Boolean> next(Collection<NextObjective> nextObjectives);
void next(NextObjective nextObjective);
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.net.behaviour;
import org.onlab.osgi.ServiceDirectory;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
/**
* Processing context and supporting services for the pipeline behaviour.
......@@ -30,5 +31,11 @@ public interface PipelinerContext {
*/
ServiceDirectory directory();
/**
* Returns the Objective Store where data can be stored and retrieved.
* @return the flow objective store
*/
FlowObjectiveStore store();
// TODO: add means to store and access shared state
}
......
......@@ -23,6 +23,7 @@ import org.onosproject.net.flow.criteria.Criterion;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -42,6 +43,7 @@ public final class DefaultFilteringObjective implements FilteringObjective {
private final List<Criterion> conditions;
private final int id;
private final Operation op;
private final Optional<ObjectiveContext> context;
private DefaultFilteringObjective(Type type, boolean permanent, int timeout,
ApplicationId appId, int priority, Criterion key,
......@@ -54,6 +56,25 @@ public final class DefaultFilteringObjective implements FilteringObjective {
this.priority = priority;
this.conditions = conditions;
this.op = op;
this.context = Optional.empty();
this.id = Objects.hash(type, key, conditions, permanent,
timeout, appId, priority);
}
public DefaultFilteringObjective(Type type, boolean permanent, int timeout,
ApplicationId appId, int priority, Criterion key,
List<Criterion> conditions,
ObjectiveContext context, Operation op) {
this.key = key;
this.type = type;
this.permanent = permanent;
this.timeout = timeout;
this.appId = appId;
this.priority = priority;
this.conditions = conditions;
this.op = op;
this.context = Optional.ofNullable(context);
this.id = Objects.hash(type, key, conditions, permanent,
timeout, appId, priority);
......@@ -104,6 +125,11 @@ public final class DefaultFilteringObjective implements FilteringObjective {
return op;
}
@Override
public Optional<ObjectiveContext> context() {
return context;
}
/**
* Returns a new builder.
*
......@@ -201,6 +227,31 @@ public final class DefaultFilteringObjective implements FilteringObjective {
}
@Override
public FilteringObjective add(ObjectiveContext context) {
List<Criterion> conditions = listBuilder.build();
checkNotNull(type, "Must have a type.");
checkArgument(!conditions.isEmpty(), "Must have at least one condition.");
checkNotNull(appId, "Must supply an application id");
return new DefaultFilteringObjective(type, permanent, timeout,
appId, priority, key, conditions,
context, Operation.ADD);
}
@Override
public FilteringObjective remove(ObjectiveContext context) {
List<Criterion> conditions = listBuilder.build();
checkNotNull(type, "Must have a type.");
checkArgument(!conditions.isEmpty(), "Must have at least one condition.");
checkNotNull(appId, "Must supply an application id");
return new DefaultFilteringObjective(type, permanent, timeout,
appId, priority, key, conditions,
context, Operation.REMOVE);
}
}
......
......@@ -20,6 +20,7 @@ import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import java.util.Objects;
import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -38,6 +39,7 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
private final int nextId;
private final TrafficTreatment treatment;
private final Operation op;
private final Optional<ObjectiveContext> context;
private final int id;
......@@ -55,6 +57,29 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
this.nextId = nextId;
this.treatment = treatment;
this.op = op;
this.context = Optional.empty();
this.id = Objects.hash(selector, flag, permanent,
timeout, appId, priority, nextId,
treatment, op);
}
private DefaultForwardingObjective(TrafficSelector selector,
Flag flag, boolean permanent,
int timeout, ApplicationId appId,
int priority, int nextId,
TrafficTreatment treatment,
ObjectiveContext context, Operation op) {
this.selector = selector;
this.flag = flag;
this.permanent = permanent;
this.timeout = timeout;
this.appId = appId;
this.priority = priority;
this.nextId = nextId;
this.treatment = treatment;
this.op = op;
this.context = Optional.ofNullable(context);
this.id = Objects.hash(selector, flag, permanent,
timeout, appId, priority, nextId,
......@@ -113,6 +138,11 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
return op;
}
@Override
public Optional<ObjectiveContext> context() {
return context;
}
/**
* Returns a new builder.
*
......@@ -186,7 +216,7 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
public ForwardingObjective add() {
checkNotNull(selector, "Must have a selector");
checkNotNull(flag, "A flag must be set");
checkArgument(nextId != null && treatment != null, "Must supply at " +
checkArgument(nextId != null || treatment != null, "Must supply at " +
"least a treatment and/or a nextId");
checkNotNull(appId, "Must supply an application id");
return new DefaultForwardingObjective(selector, flag, permanent,
......@@ -198,12 +228,38 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
public ForwardingObjective remove() {
checkNotNull(selector, "Must have a selector");
checkNotNull(flag, "A flag must be set");
checkArgument(nextId != null && treatment != null, "Must supply at " +
checkArgument(nextId != null || treatment != null, "Must supply at " +
"least a treatment and/or a nextId");
checkNotNull(appId, "Must supply an application id");
return new DefaultForwardingObjective(selector, flag, permanent,
timeout, appId, priority,
nextId, treatment, Operation.REMOVE);
}
@Override
public ForwardingObjective add(ObjectiveContext context) {
checkNotNull(selector, "Must have a selector");
checkNotNull(flag, "A flag must be set");
checkArgument(nextId != null || treatment != null, "Must supply at " +
"least a treatment and/or a nextId");
checkNotNull(appId, "Must supply an application id");
return new DefaultForwardingObjective(selector, flag, permanent,
timeout, appId, priority,
nextId, treatment,
context, Operation.ADD);
}
@Override
public ForwardingObjective remove(ObjectiveContext context) {
checkNotNull(selector, "Must have a selector");
checkNotNull(flag, "A flag must be set");
checkArgument(nextId != null || treatment != null, "Must supply at " +
"least a treatment and/or a nextId");
checkNotNull(appId, "Must supply an application id");
return new DefaultForwardingObjective(selector, flag, permanent,
timeout, appId, priority,
nextId, treatment,
context, Operation.REMOVE);
}
}
}
......
......@@ -21,6 +21,7 @@ import org.onosproject.net.flow.TrafficTreatment;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -34,13 +35,28 @@ public final class DefaultNextObjective implements NextObjective {
private final ApplicationId appId;
private final Type type;
private final Integer id;
private final Operation op;
private final Optional<ObjectiveContext> context;
private DefaultNextObjective(Integer id, List<TrafficTreatment> treatments,
ApplicationId appId, Type type) {
ApplicationId appId, Type type, Operation op) {
this.treatments = treatments;
this.appId = appId;
this.type = type;
this.id = id;
this.op = op;
this.context = Optional.empty();
}
private DefaultNextObjective(Integer id, List<TrafficTreatment> treatments,
ApplicationId appId, ObjectiveContext context,
Type type, Operation op) {
this.treatments = treatments;
this.appId = appId;
this.type = type;
this.id = id;
this.op = op;
this.context = Optional.ofNullable(context);
}
@Override
......@@ -80,7 +96,12 @@ public final class DefaultNextObjective implements NextObjective {
@Override
public Operation op() {
throw new UnsupportedOperationException("Next Objective has no operation");
return op;
}
@Override
public Optional<ObjectiveContext> context() {
return context;
}
/**
......@@ -101,8 +122,6 @@ public final class DefaultNextObjective implements NextObjective {
private final ImmutableList.Builder<TrafficTreatment> listBuilder
= ImmutableList.builder();
@Override
public NextObjective.Builder withId(int nextId) {
this.id = nextId;
......@@ -143,7 +162,7 @@ public final class DefaultNextObjective implements NextObjective {
}
@Override
public Builder fromApp(ApplicationId appId) {
public NextObjective.Builder fromApp(ApplicationId appId) {
this.appId = appId;
return this;
}
......@@ -160,14 +179,49 @@ public final class DefaultNextObjective implements NextObjective {
}
@Override
public NextObjective build() {
public NextObjective add() {
List<TrafficTreatment> treatments = listBuilder.build();
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
return new DefaultNextObjective(id, treatments, appId, type, Operation.ADD);
}
@Override
public NextObjective remove() {
List<TrafficTreatment> treatments = listBuilder.build();
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
return new DefaultNextObjective(id, treatments, appId, type, Operation.REMOVE);
}
@Override
public NextObjective add(ObjectiveContext context) {
List<TrafficTreatment> treatments = listBuilder.build();
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
return new DefaultNextObjective(id, treatments, appId,
context, type, Operation.ADD);
}
@Override
public NextObjective remove(ObjectiveContext context) {
List<TrafficTreatment> treatments = listBuilder.build();
checkNotNull(appId, "Must supply an application id");
checkNotNull(id, "id cannot be null");
checkNotNull(type, "The type cannot be null");
checkArgument(!treatments.isEmpty(), "Must have at least one treatment");
return new DefaultNextObjective(id, treatments, appId, type);
return new DefaultNextObjective(id, treatments, appId,
context, type, Operation.REMOVE);
}
}
}
......
......@@ -114,6 +114,24 @@ public interface FilteringObjective extends Objective {
*/
public FilteringObjective remove();
/**
* Builds the filtering objective that will be added.
* The context will be used to notify the calling application.
*
* @param context an objective context
* @return a filtering objective
*/
public FilteringObjective add(ObjectiveContext context);
/**
* Builds the filtering objective that will be removed.
* The context will be used to notify the calling application.
*
* @param context an objective context
* @return a filtering objective
*/
public FilteringObjective remove(ObjectiveContext context);
}
......
......@@ -17,9 +17,6 @@ package org.onosproject.net.flowobjective;
import org.onosproject.net.DeviceId;
import java.util.Collection;
import java.util.concurrent.Future;
/**
* Service for programming data plane flow rules in manner independent of
* specific device table pipeline configuration.
......@@ -30,27 +27,24 @@ public interface FlowObjectiveService {
* Installs the filtering rules onto the specified device.
*
* @param deviceId device identifier
* @param filteringObjectives the collection of filters
* @return a future indicating the success of the operation
* @param filteringObjective the filtering objective
*/
Future<Boolean> filter(DeviceId deviceId, Collection<FilteringObjective> filteringObjectives);
void filter(DeviceId deviceId, FilteringObjective filteringObjective);
/**
* Installs the forwarding rules onto the specified device.
*
* @param deviceId device identifier
* @param forwardingObjectives the collection of forwarding objectives
* @return a future indicating the success of the operation
* @param forwardingObjective the forwarding objective
*/
Future<Boolean> forward(DeviceId deviceId, Collection<ForwardingObjective> forwardingObjectives);
void forward(DeviceId deviceId, ForwardingObjective forwardingObjective);
/**
* Installs the next hop elements into the specified device.
*
* @param deviceId device identifier
* @param nextObjectives the collection of next objectives
* @return a future indicating the success of the operation
* @param nextObjective a next objective
*/
Future<Boolean> next(DeviceId deviceId, Collection<NextObjective> nextObjectives);
void next(DeviceId deviceId, NextObjective nextObjective);
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flowobjective;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.store.Store;
/**
* The flow objective store.
*/
public interface FlowObjectiveStore
extends Store<ObjectiveEvent, FlowObjectiveStoreDelegate> {
/**
* Adds a NextGroup to the store.
*
* @param nextId an integer
* @param group a next group opaque object
*/
void putNextGroup(Integer nextId, NextGroup group);
/**
* Fetch a next group from the store.
* @param nextId an integer
* @return a next group
*/
NextGroup getNextGroup(Integer nextId);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flowobjective;
import org.onosproject.store.StoreDelegate;
/**
* Flow Objective store delegate abstraction.
*/
public interface FlowObjectiveStoreDelegate extends StoreDelegate<ObjectiveEvent> {
}
......@@ -121,5 +121,23 @@ public interface ForwardingObjective extends Objective {
* @return a forwarding objective.
*/
public ForwardingObjective remove();
/**
* Builds the forwarding objective that will be added.
* The context will be used to notify the calling application.
*
* @param context an objective context
* @return a forwarding objective
*/
public ForwardingObjective add(ObjectiveContext context);
/**
* Builds the forwarding objective that will be removed.
* The context will be used to notify the calling application.
*
* @param context an objective context
* @return a forwarding objective
*/
public ForwardingObjective remove(ObjectiveContext context);
}
}
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.net.flowobjective;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.flow.TrafficTreatment;
import java.util.Collection;
......@@ -95,12 +96,40 @@ public interface NextObjective extends Objective {
*/
public Builder addTreatment(TrafficTreatment treatment);
@Override
public Builder fromApp(ApplicationId appId);
/**
* Builds the next objective that will be added.
*
* @return a next objective
*/
public NextObjective add();
/**
* Builds the next objective that will be removed.
*
* @return a next objective.
*/
public NextObjective remove();
/**
* Builds the next objective that will be added.
* The context will be used to notify the calling application.
*
* @param context an objective context
* @return a next objective
*/
public NextObjective add(ObjectiveContext context);
/**
* Builds a next step.
* Builds the next objective that will be removed.
* The context will be used to notify the calling application.
*
* @return a next step
* @param context an objective context
* @return a next objective
*/
public NextObjective build();
public NextObjective remove(ObjectiveContext context);
}
......
......@@ -17,6 +17,8 @@ package org.onosproject.net.flowobjective;
import org.onosproject.core.ApplicationId;
import java.util.Optional;
/**
* Base representation of an flow description.
*/
......@@ -84,6 +86,14 @@ public interface Objective {
Operation op();
/**
* Obtains an optional context.
*
* @return optional; which will be empty if there is no context.
* Otherwise it will return the context.
*/
Optional<ObjectiveContext> context();
/**
* An objective builder.
*/
public interface Builder {
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flowobjective;
/**
* The context of a objective that will become the subject of
* the notification.
*
* Implementations of this class must be serializable.
*/
public interface ObjectiveContext {
default void onSuccess(Objective objective) {}
default void onError(Objective objective, ObjectiveError error) {}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flowobjective;
/**
* Represents the set of errors possible when processing an objective.
*/
public enum ObjectiveError {
/**
* The driver processing this objective does not know how to process it.
*/
UNSUPPORTED,
/**
* The flow installation for this objective failed.
*/
FLOWINSTALLATIONFAILED,
/**
* THe group installation for this objective failed.
*/
GROUPINSTALLATIONFAILED,
/**
* The group was reported as installed but is not missing.
*/
GROUPMISSING,
/**
* An unknown error occurred.
*/
UNKNOWN
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flowobjective;
import org.onosproject.event.AbstractEvent;
/**
* Describes a objective event.
*/
public class ObjectiveEvent extends AbstractEvent<ObjectiveEvent.Type, Integer> {
/**
* Type of objective events.
*/
public enum Type {
/**
* Signifies that the objective has been added to the store.
*/
ADD,
/**
* Signifies that the objective has been removed.
*/
REMOVE
}
/**
* Creates an event of the given type for the specified objective id.
*
* @param type the type of the event
* @param objective the objective id the event is about
*/
public ObjectiveEvent(Type type, Integer objective) {
super(type, objective);
}
/**
* Creates an event of the given type for the specified objective id at the given
* time.
*
* @param type the type of the event
* @param objective the objective id the event is about
* @param time the time of the event
*/
public ObjectiveEvent(Type type, Integer objective, long time) {
super(type, objective, time);
}
}
......@@ -17,7 +17,7 @@ package org.onosproject.net.flowobjective.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -43,9 +43,12 @@ import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -53,14 +56,14 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.Set;
import static com.google.common.base.Preconditions.checkState;
/**
* Provides implementation of the flow objective programming service.
*/
@Component(immediate = false)
@Component(immediate = true)
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
......@@ -89,6 +92,10 @@ public class FlowObjectiveManager implements FlowObjectiveService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected GroupService groupService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowObjectiveStore flowObjectiveStore;
private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
......@@ -101,10 +108,16 @@ public class FlowObjectiveManager implements FlowObjectiveService {
private final Map<DeviceId, Collection<Objective>> pendingObjectives =
Maps.newConcurrentMap();
private NodeId localNode;
private Map<Integer, Set<PendingNext>> pendingForwards =
Maps.newConcurrentMap();
@Activate
protected void activate() {
flowObjectiveStore.setDelegate(delegate);
localNode = clusterService.getLocalNode().id();
mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener);
......@@ -114,46 +127,64 @@ public class FlowObjectiveManager implements FlowObjectiveService {
@Deactivate
protected void deactivate() {
flowObjectiveStore.unsetDelegate(delegate);
mastershipService.removeListener(mastershipListener);
deviceService.removeListener(deviceListener);
log.info("Stopped");
}
@Override
public Future<Boolean> filter(DeviceId deviceId,
Collection<FilteringObjective> filteringObjectives) {
public void filter(DeviceId deviceId,
FilteringObjective filteringObjective) {
if (deviceService.isAvailable(deviceId)) {
return getDevicePipeliner(deviceId).filter(filteringObjectives);
getDevicePipeliner(deviceId).filter(filteringObjective);
} else {
filteringObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
updatePendingMap(deviceId, filteringObjective);
}
return Futures.immediateFuture(true);
}
@Override
public void forward(DeviceId deviceId,
ForwardingObjective forwardingObjective) {
if (queueObjective(deviceId, forwardingObjective)) {
return;
}
@Override
public Future<Boolean> forward(DeviceId deviceId,
Collection<ForwardingObjective> forwardingObjectives) {
if (deviceService.isAvailable(deviceId)) {
return getDevicePipeliner(deviceId).forward(forwardingObjectives);
getDevicePipeliner(deviceId).forward(forwardingObjective);
} else {
forwardingObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
updatePendingMap(deviceId, forwardingObjective);
}
return Futures.immediateFuture(true);
}
@Override
public Future<Boolean> next(DeviceId deviceId,
Collection<NextObjective> nextObjectives) {
public void next(DeviceId deviceId,
NextObjective nextObjective) {
if (deviceService.isAvailable(deviceId)) {
return getDevicePipeliner(deviceId).next(nextObjectives);
getDevicePipeliner(deviceId).next(nextObjective);
} else {
nextObjectives.forEach(obj -> updatePendingMap(deviceId, obj));
updatePendingMap(deviceId, nextObjective);
}
return Futures.immediateFuture(true);
}
private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
if (fwd.nextId() != null &&
flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
log.warn("Queuing forwarding objective.");
if (pendingForwards.putIfAbsent(fwd.nextId(),
Sets.newHashSet(new PendingNext(deviceId, fwd))) != null) {
Set<PendingNext> pending = pendingForwards.get(fwd.nextId());
pending.add(new PendingNext(deviceId, fwd));
}
return true;
}
return false;
}
private void updatePendingMap(DeviceId deviceId, Objective pending) {
if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
Collection<Objective> objectives = pendingObjectives.get(deviceId);
......@@ -169,6 +200,33 @@ public class FlowObjectiveManager implements FlowObjectiveService {
return pipeliner;
}
private void setupPipelineHandler(DeviceId deviceId) {
if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
// Attempt to lookup the handler in the cache
DriverHandler handler = driverHandlers.get(deviceId);
if (handler == null) {
try {
// Otherwise create it and if it has pipeline behaviour, cache it
handler = driverService.createHandler(deviceId);
if (!handler.driver().hasBehaviour(Pipeliner.class)) {
log.warn("Pipeline behaviour not supported for device {}",
deviceId);
return;
}
} catch (ItemNotFoundException e) {
log.warn("No applicable driver for device {}", deviceId);
return;
}
driverHandlers.put(deviceId, handler);
}
// Always (re)initialize the pipeline behaviour
Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
pipeliner.init(deviceId, context);
pipeliners.putIfAbsent(deviceId, pipeliner);
log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
}
}
// Triggers driver setup when the local node becomes a device master.
private class InnerMastershipListener implements MastershipListener {
......@@ -221,52 +279,70 @@ public class FlowObjectiveManager implements FlowObjectiveService {
pendingObjectives.getOrDefault(deviceId,
Collections.emptySet()).forEach(obj -> {
if (obj instanceof NextObjective) {
getDevicePipeliner(deviceId)
.next(Collections.singletonList((NextObjective) obj));
next(deviceId, (NextObjective) obj);
} else if (obj instanceof ForwardingObjective) {
getDevicePipeliner(deviceId)
.forward(Collections.singletonList((ForwardingObjective) obj));
forward(deviceId, (ForwardingObjective) obj);
} else {
getDevicePipeliner(deviceId)
.filter(Collections.singletonList((FilteringObjective) obj));
.filter((FilteringObjective) obj);
}
});
}
}
private void setupPipelineHandler(DeviceId deviceId) {
if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
// Attempt to lookup the handler in the cache
DriverHandler handler = driverHandlers.get(deviceId);
if (handler == null) {
try {
// Otherwise create it and if it has pipeline behaviour, cache it
handler = driverService.createHandler(deviceId);
if (!handler.driver().hasBehaviour(Pipeliner.class)) {
log.warn("Pipeline behaviour not supported for device {}",
deviceId);
return;
// Processing context for initializing pipeline driver behaviours.
private class InnerPipelineContext implements PipelinerContext {
@Override
public ServiceDirectory directory() {
return serviceDirectory;
}
} catch (ItemNotFoundException e) {
log.warn("No applicable driver for device {}", deviceId);
@Override
public FlowObjectiveStore store() {
return flowObjectiveStore;
}
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@Override
public void notify(ObjectiveEvent event) {
Set<PendingNext> pending = pendingForwards.remove(event.subject());
if (pending == null) {
return;
}
driverHandlers.put(deviceId, handler);
log.info("Processing pending objectives {}", pending.size());
pending.forEach(p -> getDevicePipeliner(p.deviceId())
.forward(p.forwardingObjective()));
}
}
// Always (re)initialize the pipeline behaviour
Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
pipeliner.init(deviceId, context);
pipeliners.putIfAbsent(deviceId, pipeliner);
log.info("Driver {} bound to device {}", handler.driver().name(), deviceId);
/**
* Data class used to hold a pending forwarding objective that could not
* be processed because the associated next object was not present.
*/
private class PendingNext {
private final DeviceId deviceId;
private final ForwardingObjective fwd;
public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
this.deviceId = deviceId;
this.fwd = fwd;
}
public DeviceId deviceId() {
return deviceId;
}
// Processing context for initializing pipeline driver behaviours.
private class InnerPipelineContext implements PipelinerContext {
@Override
public ServiceDirectory directory() {
return serviceDirectory;
public ForwardingObjective forwardingObjective() {
return fwd;
}
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.flowobjective.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.behaviour.DefaultNextGroup;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages the inventory of created next groups.
*/
@Component(immediate = true, enabled = true)
@Service
public class DistributedFlowObjectiveStore
extends AbstractStore<ObjectiveEvent, FlowObjectiveStoreDelegate>
implements FlowObjectiveStore {
private final Logger log = getLogger(getClass());
private ConsistentMap<Integer, byte[]> nextGroups;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Activate
public void activate() {
nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
.withName("flowobjective-groups")
.withSerializer(Serializer.using(
new KryoNamespace.Builder()
.register(byte[].class)
.build()))
.build();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void putNextGroup(Integer nextId, NextGroup group) {
nextGroups.putIfAbsent(nextId, group.data());
notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
}
@Override
public NextGroup getNextGroup(Integer nextId) {
Versioned<byte[]> versionGroup = nextGroups.get(nextId);
if (versionGroup != null) {
return new DefaultNextGroup(versionGroup.value());
}
return null;
}
}
......@@ -167,10 +167,13 @@ public class DistributedPacketStore
public PacketRequestTracker() {
requests = storageService.<PacketRequest, Boolean>consistentMapBuilder()
.withName("packet-requests")
.withSerializer(Serializer.using(
new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
.withSerializer(new Serializer() {
KryoNamespace kryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.build();
@Override
public <T> byte[] encode(T object) {
return kryo.serialize(object);
......
......@@ -44,6 +44,12 @@
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
......
......@@ -31,11 +31,9 @@ import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.concurrent.Future;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -58,14 +56,14 @@ public class DefaultSingleTablePipeline extends AbstractHandlerBehaviour impleme
}
@Override
public Future<Boolean> filter(Collection<FilteringObjective> filters) {
public void filter(FilteringObjective filter) {
throw new UnsupportedOperationException("Single table does not filter.");
}
@Override
public Future<Boolean> forward(Collection<ForwardingObjective> forwardings) {
public void forward(ForwardingObjective fwd) {
FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
forwardings.forEach(fwd -> {
if (fwd.flag() != ForwardingObjective.Flag.VERSATILE) {
throw new UnsupportedOperationException(
"Only VERSATILE is supported.");
......@@ -91,26 +89,29 @@ public class DefaultSingleTablePipeline extends AbstractHandlerBehaviour impleme
log.warn("Unknown operation {}", fwd.op());
}
});
SettableFuture<Boolean> future = SettableFuture.create();
flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
future.set(true);
if (fwd.context().isPresent()) {
fwd.context().get().onSuccess(fwd);
}
}
@Override
public void onError(FlowRuleOperations ops) {
future.set(false);
if (fwd.context().isPresent()) {
fwd.context().get().onError(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
}
}
}));
return future;
}
@Override
public Future<Boolean> next(Collection<NextObjective> nextObjectives) {
public void next(NextObjective nextObjective) {
throw new UnsupportedOperationException("Single table does not next hop.");
}
......
......@@ -15,15 +15,19 @@
*/
package org.onosproject.driver.pipeline;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
......@@ -39,18 +43,37 @@ import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.concurrent.Future;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Corsa pipeline handler.
* OpenvSwitch emulation of the Corsa pipeline handler.
*/
public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeliner {
......@@ -63,17 +86,45 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
private ServiceDirectory serviceDirectory;
private FlowRuleService flowRuleService;
private CoreService coreService;
private GroupService groupService;
private FlowObjectiveStore flowObjectiveStore;
private DeviceId deviceId;
private ApplicationId appId;
private KryoNamespace appKryo = new KryoNamespace.Builder()
.register(GroupKey.class)
.register(DefaultGroupKey.class)
.register(CorsaGroup.class)
.register(byte[].class)
.build();
private Cache<GroupKey, NextObjective> pendingGroups;
private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
"ovs-corsa-%d"));
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
pendingGroups = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
}
}).build();
groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
coreService = serviceDirectory.get(CoreService.class);
flowRuleService = serviceDirectory.get(FlowRuleService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
groupService.addListener(new InnerGroupListener());
appId = coreService.registerApplication(
"org.onosproject.driver.OVSCorsaPipeline");
......@@ -82,33 +133,159 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
}
@Override
public Future<Boolean> filter(Collection<FilteringObjective> filteringObjectives) {
Collection<Future<Boolean>> results = Sets.newHashSet();
filteringObjectives.stream()
.filter(obj -> obj.type() == FilteringObjective.Type.PERMIT)
.forEach(filtobj -> results.add(processFilter(filtobj,
filtobj.op() == Objective.Operation.ADD,
filtobj.appId()
)));
public void filter(FilteringObjective filteringObjective) {
if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
processFilter(filteringObjective,
filteringObjective.op() == Objective.Operation.ADD,
filteringObjective.appId());
} else {
fail(filteringObjective, ObjectiveError.UNSUPPORTED);
}
}
//TODO: return something more helpful/sensible in the future (no pun intended)
return results.iterator().next();
@Override
public void forward(ForwardingObjective fwd) {
Collection<FlowRule> rules;
FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
rules = processForward(fwd);
switch (fwd.op()) {
case ADD:
rules.stream()
.filter(rule -> rule != null)
.forEach(flowBuilder::add);
break;
case REMOVE:
rules.stream()
.filter(rule -> rule != null)
.forEach(flowBuilder::remove);
break;
default:
fail(fwd, ObjectiveError.UNKNOWN);
log.warn("Unknown forwarding type {}", fwd.op());
}
flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
pass(fwd);
}
@Override
public void onError(FlowRuleOperations ops) {
fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
}
}));
}
private Future<Boolean> processFilter(FilteringObjective filt, boolean install,
@Override
public void next(NextObjective nextObjective) {
switch (nextObjective.type()) {
case SIMPLE:
Collection<TrafficTreatment> treatments = nextObjective.next();
if (treatments.size() == 1) {
TrafficTreatment treatment = treatments.iterator().next();
GroupBucket bucket =
DefaultGroupBucket.createIndirectGroupBucket(treatment);
final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
GroupDescription groupDescription
= new DefaultGroupDescription(deviceId,
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections
.singletonList(bucket)),
key,
nextObjective.appId());
groupService.addGroup(groupDescription);
pendingGroups.put(key, nextObjective);
}
break;
case HASHED:
case BROADCAST:
case FAILOVER:
fail(nextObjective, ObjectiveError.UNSUPPORTED);
log.warn("Unsupported next objective type {}", nextObjective.type());
break;
default:
fail(nextObjective, ObjectiveError.UNKNOWN);
log.warn("Unknown next objective type {}", nextObjective.type());
}
}
private Collection<FlowRule> processForward(ForwardingObjective fwd) {
switch (fwd.flag()) {
case SPECIFIC:
return processSpecific(fwd);
case VERSATILE:
return processVersatile(fwd);
default:
fail(fwd, ObjectiveError.UNKNOWN);
log.warn("Unknown forwarding flag {}", fwd.flag());
}
return Collections.emptySet();
}
private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
fail(fwd, ObjectiveError.UNSUPPORTED);
return Collections.emptySet();
}
private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
log.warn("Processing specific");
TrafficSelector selector = fwd.selector();
Criteria.EthTypeCriterion ethType =
(Criteria.EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
if (ethType == null || ethType.ethType() != Ethernet.TYPE_IPV4) {
fail(fwd, ObjectiveError.UNSUPPORTED);
return Collections.emptySet();
}
TrafficSelector filteredSelector =
DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(
((Criteria.IPCriterion)
selector.getCriterion(Criterion.Type.IPV4_DST)).ip())
.build();
NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
GroupKey key = appKryo.deserialize(next.data());
Group group = groupService.getGroup(deviceId, key);
if (group == null) {
log.warn("The group left!");
fail(fwd, ObjectiveError.GROUPMISSING);
return Collections.emptySet();
}
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.group(group.id())
.build();
return Collections.singletonList(
new DefaultFlowRule(deviceId, filteredSelector, treatment,
fwd.priority(), fwd.appId(), 0, fwd.permanent(),
FlowRule.Type.IP));
}
private void processFilter(FilteringObjective filt, boolean install,
ApplicationId applicationId) {
SettableFuture<Boolean> result = SettableFuture.create();
// This driver only processes filtering criteria defined with switch
// ports as the key
Criteria.PortCriterion p = null;
Criteria.PortCriterion p;
if (!filt.key().equals(Criteria.dummy()) &&
filt.key().type() == Criterion.Type.IN_PORT) {
p = (Criteria.PortCriterion) filt.key();
} else {
log.warn("No key defined in filtering objective from app: {}. Not"
+ "processing filtering objective", applicationId);
return null;
fail(filt, ObjectiveError.UNKNOWN);
return;
}
// convert filtering conditions for switch-intfs into flowrules
FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
......@@ -154,45 +331,45 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
} else {
log.warn("Driver does not currently process filtering condition"
+ " of type: {}", c.type());
fail(filt, ObjectiveError.UNSUPPORTED);
}
}
// apply filtering flow rules
flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
result.set(true);
pass(filt);
log.info("Provisioned default table for bgp router");
}
@Override
public void onError(FlowRuleOperations ops) {
result.set(false);
fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
log.info("Failed to provision default table for bgp router");
}
}));
return result;
}
@Override
public Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives) {
return null;
private void pass(Objective obj) {
if (obj.context().isPresent()) {
obj.context().get().onSuccess(obj);
}
}
@Override
public Future<Boolean> next(Collection<NextObjective> nextObjectives) {
return null;
private void fail(Objective obj, ObjectiveError error) {
if (obj.context().isPresent()) {
obj.context().get().onError(obj, error);
}
}
private void pushDefaultRules() {
boolean install = true;
processTableZero(install);
processTableOne(install);
processTableTwo(install);
processTableFour(install);
processTableFive(install);
processTableSix(install);
processTableNine(install);
processTableZero(true);
processTableOne(true);
processTableTwo(true);
processTableFour(true);
processTableFive(true);
processTableSix(true);
processTableNine(true);
}
private void processTableZero(boolean install) {
......@@ -447,4 +624,59 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
}));
}
private class InnerGroupListener implements GroupListener {
@Override
public void event(GroupEvent event) {
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
GroupKey key = event.subject().appCookie();
NextObjective obj = pendingGroups.getIfPresent(key);
if (obj != null) {
flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
pass(obj);
pendingGroups.invalidate(key);
}
}
}
}
private class GroupChecker implements Runnable {
@Override
public void run() {
Set<GroupKey> keys = pendingGroups.asMap().keySet().stream()
.filter(key -> groupService.getGroup(deviceId, key) != null)
.collect(Collectors.toSet());
keys.stream().forEach(key -> {
NextObjective obj = pendingGroups.getIfPresent(key);
if (obj == null) {
return;
}
pass(obj);
pendingGroups.invalidate(key);
flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
});
}
}
private class CorsaGroup implements NextGroup {
private final GroupKey key;
public CorsaGroup(GroupKey key) {
this.key = key;
}
public GroupKey key() {
return key;
}
@Override
public byte[] data() {
return appKryo.serialize(key);
}
}
}
......
......@@ -19,7 +19,7 @@
<behaviour api="org.onosproject.net.behaviour.Pipeliner"
impl="org.onosproject.driver.pipeline.DefaultSingleTablePipeline"/>
</driver>
<driver name="ovs-corsa" manufacturer="Nicira, Inc." hwVersion="Open vSwitch" swVersion="2.3.0">
<driver name="ovs-corsa" manufacturer="Corsa" hwVersion="emulation" swVersion="0.0.0">
<behaviour api="org.onosproject.net.behaviour.Pipeliner"
impl="org.onosproject.driver.pipeline.OVSCorsaPipeline"/>
</driver>
......
......@@ -195,7 +195,6 @@ public class FlowModBuilderVer13 extends FlowModBuilder {
for (Instruction i : treatments) {
switch (i.type()) {
case DROP:
log.warn("Saw drop action; assigning drop action");
return new LinkedList<>();
case L0MODIFICATION:
actions.add(buildL0Modification(i));
......