alshabib
Committed by Gerrit Code Review

added support for multicast in olt pipeline

Change-Id: I25c6ab18d23035094851b0800f719f28e210d403
......@@ -15,6 +15,10 @@
*/
package org.onosproject.driver.pipeline;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
......@@ -22,20 +26,14 @@ import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
......@@ -56,20 +54,32 @@ import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -79,35 +89,55 @@ import static org.slf4j.LoggerFactory.getLogger;
public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
private static final Integer QQ_TABLE = 1;
private static final short MCAST_VLAN = 4000;
private final Logger log = getLogger(getClass());
static final ProviderId PID = new ProviderId("olt", "org.onosproject.olt", true);
static final String DEVICE = "isAccess";
static final String OLT = "true";
private ServiceDirectory serviceDirectory;
private FlowRuleService flowRuleService;
private DeviceId deviceId;
private GroupService groupService;
private CoreService coreService;
private DeviceId deviceId;
private ApplicationId appId;
private DeviceProvider provider = new AnnotationProvider();
protected FlowObjectiveStore flowObjectiveStore;
private Cache<GroupKey, NextObjective> pendingGroups;
protected static KryoNamespace appKryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(GroupKey.class)
.register(DefaultGroupKey.class)
.register(OLTPipelineGroup.class)
.register(byte[].class)
.build();
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
log.debug("Initiate OLT pipeline");
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
DeviceProviderRegistry registry =
serviceDirectory.get(DeviceProviderRegistry.class);
flowRuleService = serviceDirectory.get(FlowRuleService.class);
coreService = serviceDirectory.get(CoreService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
appId = coreService.registerApplication(
"org.onosproject.driver.OLTPipeline");
pendingGroups = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
}
}).build();
groupService.addListener(new InnerGroupListener());
}
@Override
......@@ -164,6 +194,12 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
@Override
public void forward(ForwardingObjective fwd) {
if (checkForMulticast(fwd)) {
processMulticastRule(fwd);
return;
}
TrafficTreatment treatment = fwd.treatment();
List<Instruction> instructions = treatment.allInstructions();
......@@ -198,9 +234,113 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
}
@Override
public void next(NextObjective nextObjective) {
throw new UnsupportedOperationException("OLT does not next hop.");
if (nextObjective.type() != NextObjective.Type.BROADCAST) {
log.error("OLT only supports broadcast groups.");
fail(nextObjective, ObjectiveError.BADPARAMS);
}
if (nextObjective.next().size() != 1) {
log.error("OLT only supports singleton broadcast groups.");
fail(nextObjective, ObjectiveError.BADPARAMS);
}
TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
GroupDescription groupDesc =
new DefaultGroupDescription(deviceId,
GroupDescription.Type.ALL,
new GroupBuckets(Collections.singletonList(bucket)),
key,
null,
nextObjective.appId());
pendingGroups.put(key, nextObjective);
switch (nextObjective.op()) {
case ADD:
groupService.addGroup(groupDesc);
break;
case REMOVE:
groupService.removeGroup(deviceId, key, nextObjective.appId());
break;
case ADD_TO_EXISTING:
case REMOVE_FROM_EXISTING:
//TODO: handle addition to group when caller signals it.
break;
default:
log.warn("Unknown next objective operation: {}", nextObjective.op());
}
}
private void processMulticastRule(ForwardingObjective fwd) {
if (fwd.nextId() == null) {
log.error("Multicast objective does not have a next id");
fail(fwd, ObjectiveError.BADPARAMS);
}
OLTPipelineGroup next = getGroupForNextObjective(fwd.nextId());
if (next == null) {
log.error("Group for forwarding objective missing: {}", fwd);
fail(fwd, ObjectiveError.GROUPMISSING);
}
Group group = groupService.getGroup(deviceId, next.key());
TrafficTreatment treatment =
buildTreatment(Instructions.createGroup(group.id()));
FlowRule rule = DefaultFlowRule.builder()
.forDevice(deviceId)
.forTable(0)
.fromApp(fwd.appId())
.makePermanent()
.withPriority(fwd.priority())
.withSelector(fwd.selector())
.withTreatment(treatment)
.build();
FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
switch (fwd.op()) {
case ADD:
builder.add(rule);
break;
case REMOVE:
builder.remove(rule);
break;
case ADD_TO_EXISTING:
case REMOVE_FROM_EXISTING:
break;
default:
log.warn("Unknown forwarding operation: {}", fwd.op());
}
applyFlowRules(builder, fwd);
}
private boolean checkForMulticast(ForwardingObjective fwd) {
VlanIdCriterion vlan = (VlanIdCriterion) filterForCriterion(fwd.selector().criteria(),
Criterion.Type.VLAN_VID);
return (vlan != null && vlan.vlanId().equals(VlanId.vlanId(MCAST_VLAN)));
}
private OLTPipelineGroup getGroupForNextObjective(Integer nextId) {
NextGroup next = flowObjectiveStore.getNextGroup(nextId);
return (OLTPipelineGroup) appKryo.deserialize(next.data());
}
private void installDownstreamRules(ForwardingObjective fwd) {
......@@ -494,53 +634,39 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
}
}
/**
* Build a device description.
*
* @param deviceId a deviceId
* @param key the key of the annotation
* @param value the value for the annotation
* @return a device description
*/
private DeviceDescription description(DeviceId deviceId, String key, String value) {
DeviceService deviceService = serviceDirectory.get(DeviceService.class);
Device device = deviceService.getDevice(deviceId);
checkNotNull(device, "Device not found in device service.");
private class InnerGroupListener implements GroupListener {
@Override
public void event(GroupEvent event) {
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
GroupKey key = event.subject().appCookie();
DefaultAnnotations.Builder builder = DefaultAnnotations.builder();
if (value != null) {
builder.set(key, value);
} else {
builder.remove(key);
NextObjective obj = pendingGroups.getIfPresent(key);
if (obj != null) {
flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
pass(obj);
pendingGroups.invalidate(key);
}
}
return new DefaultDeviceDescription(device.id().uri(), device.type(),
device.manufacturer(), device.hwVersion(),
device.swVersion(), device.serialNumber(),
device.chassisId(), builder.build());
}
/**
* Simple ancillary provider used to annotate device.
*/
private static final class AnnotationProvider
extends AbstractProvider implements DeviceProvider {
private AnnotationProvider() {
super(PID);
}
@Override
public void triggerProbe(DeviceId deviceId) {
private static class OLTPipelineGroup implements NextGroup {
private final GroupKey key;
public OLTPipelineGroup(GroupKey key) {
this.key = key;
}
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
public GroupKey key() {
return key;
}
@Override
public boolean isReachable(DeviceId deviceId) {
return false;
}
public byte[] data() {
return appKryo.serialize(key);
}
}
}
......