Saurav Das
Committed by Gerrit Code Review

CORD-354 OF-DPA support for link-failures.

Bug fix in flowObjectives store. Adding a removeNextGroup API to the store.

Change-Id: I5890411e5b4eabdc057402687ada26e539500f8f
......@@ -597,11 +597,20 @@ public class SegmentRoutingManager implements SegmentRoutingService {
} else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
if (deviceService.isAvailable(((Device) event.subject()).id())) {
DeviceId deviceId = ((Device) event.subject()).id();
if (deviceService.isAvailable(deviceId)) {
log.info("Processing device event {} for available device {}",
event.type(), ((Device) event.subject()).id());
processDeviceAdded((Device) event.subject());
} /* else {
if (event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED) {
// availability changed and not available - dev gone
DefaultGroupHandler groupHandler = groupHandlerMap.get(deviceId);
if (groupHandler != null) {
groupHandler.removeAllGroups();
}
}
}*/
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
processPortRemoved((Device) event.subject(),
((DeviceEvent) event).port());
......@@ -655,7 +664,8 @@ public class SegmentRoutingManager implements SegmentRoutingService {
log.debug("A link {} was removed", link.toString());
DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src().deviceId());
if (groupHandler != null) {
groupHandler.portDown(link.src().port());
groupHandler.portDown(link.src().port(),
mastershipService.isLocalMaster(link.src().deviceId()));
}
log.trace("Starting optimized route population process");
defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
......@@ -711,7 +721,8 @@ public class SegmentRoutingManager implements SegmentRoutingService {
log.debug("Port {} was removed", port.toString());
DefaultGroupHandler groupHandler = groupHandlerMap.get(device.id());
if (groupHandler != null) {
groupHandler.portDown(port.number());
groupHandler.portDown(port.number(),
mastershipService.isLocalMaster(device.id()));
}
}
......
......@@ -32,11 +32,13 @@ import org.onlab.packet.Ip4Prefix;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
......@@ -49,6 +51,7 @@ import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.link.LinkService;
import org.onosproject.segmentrouting.SegmentRoutingManager;
import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
import org.onosproject.segmentrouting.config.DeviceProperties;
import org.onosproject.store.service.EventuallyConsistentMap;
......@@ -71,9 +74,11 @@ public class DefaultGroupHandler {
protected MacAddress nodeMacAddr = null;
protected LinkService linkService;
protected FlowObjectiveService flowObjectiveService;
// local store for neighbor-device-ids and the set of ports on this device
// that connect to the same neighbor
protected ConcurrentHashMap<DeviceId, Set<PortNumber>> devicePortMap =
new ConcurrentHashMap<>();
//local store for ports on this device connected to neighbor-device-id
protected ConcurrentHashMap<PortNumber, DeviceId> portDeviceMap =
new ConcurrentHashMap<>();
protected EventuallyConsistentMap<
......@@ -225,6 +230,9 @@ public class DefaultGroupHandler {
deviceId,
nsSet);
for (NeighborSet ns : nsSet) {
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null && isMaster) {
// Create the new bucket to be updated
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
......@@ -236,15 +244,19 @@ public class DefaultGroupHandler {
.copyTtlOut()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
}
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null && isMaster) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withId(nextId)
.withType(NextObjective.Type.HASHED).fromApp(appId);
nextObjBuilder.addTreatment(tBuilder.build());
// setup metadata to pass to nextObjective - indicate the vlan on egress
// if needed by the switch pipeline. Since hashed next-hops are always to
// other neighboring routers, there is no subnet assigned on those ports.
TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
metabuilder.matchVlanId(
VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET));
NextObjective.Builder nextObjBuilder = DefaultNextObjective.builder()
.withId(nextId)
.withType(NextObjective.Type.HASHED)
.addTreatment(tBuilder.build())
.withMeta(metabuilder.build())
.fromApp(appId);
log.info("**linkUp in device {}: Adding Bucket "
+ "with Port {} to next object id {}",
deviceId,
......@@ -253,6 +265,18 @@ public class DefaultGroupHandler {
NextObjective nextObjective = nextObjBuilder.
addToExisting(new SRNextObjectiveContext(deviceId));
flowObjectiveService.next(deviceId, nextObjective);
// the addition of a bucket may actually change the neighborset
// update the global store
/*
Set<DeviceId> neighbors = new HashSet<DeviceId>(ns.getDeviceIds());
boolean newadd = neighbors.add(newLink.dst().deviceId());
if (newadd) {
NeighborSet nsnew = new NeighborSet(neighbors, ns.getEdgeLabel());
nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, nsnew),
nextId);
nsNextObjStore.remove(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
}*/
} else if (isMaster) {
log.warn("linkUp in device {}, but global store has no record "
+ "for neighbor-set {}", deviceId, ns);
......@@ -265,7 +289,7 @@ public class DefaultGroupHandler {
*
* @param port port number that has gone down
*/
public void portDown(PortNumber port) {
public void portDown(PortNumber port, boolean isMaster) {
if (portDeviceMap.get(port) == null) {
log.warn("portDown: unknown port");
return;
......@@ -292,10 +316,17 @@ public class DefaultGroupHandler {
.filter((ns) -> (ns.getDeviceIds()
.contains(portDeviceMap.get(port))))
.collect(Collectors.toSet());
log.trace("portDown: nsNextObjStore contents for device {}:",
deviceId,
nsSet);
log.debug("portDown: nsNextObjStore contents for device {}:{}",
deviceId, nsSet);
for (NeighborSet ns : nsSet) {
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null && isMaster) {
log.info("**portDown in device {}: Removing Bucket "
+ "with Port {} to next object id {}",
deviceId,
port,
nextId);
// Create the bucket to be removed
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
.builder();
......@@ -307,25 +338,28 @@ public class DefaultGroupHandler {
.copyTtlOut()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
}
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withType(NextObjective.Type.SIMPLE).withId(nextId).fromApp(appId);
.builder()
.withType(NextObjective.Type.HASHED) //same as original
.withId(nextId)
.fromApp(appId)
.addTreatment(tBuilder.build());
NextObjective nextObjective = nextObjBuilder.
removeFromExisting(new SRNextObjectiveContext(deviceId));
nextObjBuilder.addTreatment(tBuilder.build());
flowObjectiveService.next(deviceId, nextObjective);
log.info("**portDown in device {}: Removing Bucket "
+ "with Port {} to next object id {}",
deviceId,
port,
// the removal of a bucket may actually change the neighborset
// update the global store
/*
Set<DeviceId> neighbors = new HashSet<DeviceId>(ns.getDeviceIds());
boolean removed = neighbors.remove(portDeviceMap.get(port));
if (removed) {
NeighborSet nsnew = new NeighborSet(neighbors, ns.getEdgeLabel());
nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, nsnew),
nextId);
// should do removefromexisting and only if master
/*NextObjective nextObjective = nextObjBuilder.
remove(new SRNextObjectiveContext(deviceId));
flowObjectiveService.next(deviceId, nextObjective);*/
nsNextObjStore.remove(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
}*/
}
}
......@@ -718,6 +752,22 @@ public class DefaultGroupHandler {
return false;
}
public void removeAllGroups() {
for (Map.Entry<NeighborSetNextObjectiveStoreKey, Integer> entry:
nsNextObjStore.entrySet()) {
removeGroup(entry.getValue());
}
for (Map.Entry<PortNextObjectiveStoreKey, Integer> entry:
portNextObjStore.entrySet()) {
removeGroup(entry.getValue());
}
for (Map.Entry<SubnetNextObjectiveStoreKey, Integer> entry:
subnetNextObjStore.entrySet()) {
removeGroup(entry.getValue());
}
// should probably clean local stores port-neighbor
}
protected static class SRNextObjectiveContext implements ObjectiveContext {
final DeviceId deviceId;
......
......@@ -27,7 +27,8 @@ public interface FlowObjectiveStore
extends Store<ObjectiveEvent, FlowObjectiveStoreDelegate> {
/**
* Adds a NextGroup to the store.
* Adds a NextGroup to the store, by mapping it to the nextId as key,
* and replacing any previous mapping.
*
* @param nextId an integer
* @param group a next group opaque object
......@@ -36,12 +37,22 @@ public interface FlowObjectiveStore
/**
* Fetch a next group from the store.
* @param nextId an integer
* @return a next group
*
* @param nextId an integer used as key
* @return a next group, or null if group was not found
*/
NextGroup getNextGroup(Integer nextId);
/**
* Remove a next group mapping from the store.
*
* @param nextId the key to remove from the store.
* @return the next group which mapped to the nextId and is now removed, or
* null if no group mapping existed in the store
*/
NextGroup removeNextGroup(Integer nextId);
/**
* Allocates a next objective id. This id is globally unique
*
* @return an integer
......
......@@ -48,6 +48,7 @@ import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.flowobjective.ObjectiveEvent.Type;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -381,6 +382,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
@Override
public void notify(ObjectiveEvent event) {
if (event.type() == Type.ADD) {
log.debug("Received notification of obj event {}", event);
Set<PendingNext> pending = pendingForwards.remove(event.subject());
......@@ -390,10 +392,9 @@ public class FlowObjectiveManager implements FlowObjectiveService {
}
log.debug("Processing pending forwarding objectives {}", pending.size());
pending.forEach(p -> getDevicePipeliner(p.deviceId())
.forward(p.forwardingObjective()));
}
}
}
......
......@@ -79,10 +79,9 @@ public class DistributedFlowObjectiveStore
log.info("Stopped");
}
@Override
public void putNextGroup(Integer nextId, NextGroup group) {
nextGroups.putIfAbsent(nextId, group.data());
nextGroups.put(nextId, group.data());
notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
}
......@@ -96,6 +95,16 @@ public class DistributedFlowObjectiveStore
}
@Override
public NextGroup removeNextGroup(Integer nextId) {
Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
if (versionGroup != null) {
notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, nextId));
return new DefaultNextGroup(versionGroup.value());
}
return null;
}
@Override
public int allocateNextId() {
return (int) nextIds.incrementAndGet();
}
......
......@@ -71,7 +71,6 @@ public class CpqdOFDPA2Pipeline extends OFDPA2Pipeline {
* (non-Javadoc)
* @see org.onosproject.driver.pipeline.OFDPA2Pipeline#processVlanIdFilter
*/
@Override
protected List<FlowRule> processVlanIdFilter(PortCriterion portCriterion,
VlanIdCriterion vidCriterion,
......@@ -267,7 +266,8 @@ public class CpqdOFDPA2Pipeline extends OFDPA2Pipeline {
}
if (fwd.nextId() != null) {
NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
NextGroup next = getGroupForNextObjective(fwd.nextId());
if (next != null) {
List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
// we only need the top level group's key to point the flow to it
Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
......@@ -278,6 +278,7 @@ public class CpqdOFDPA2Pipeline extends OFDPA2Pipeline {
}
tb.deferred().group(group.id());
}
}
tb.transition(ACL_TABLE);
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
.fromApp(fwd.appId())
......
......@@ -94,7 +94,6 @@ import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.packet.PacketService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
......@@ -149,7 +148,6 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
protected FlowObjectiveStore flowObjectiveStore;
protected DeviceId deviceId;
protected ApplicationId driverId;
protected PacketService packetService;
protected DeviceService deviceService;
protected KryoNamespace appKryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
......@@ -174,6 +172,10 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
Map<VlanId, Set<PortNumber>> vlan2Port = new ConcurrentHashMap<VlanId,
Set<PortNumber>>();
// local store for pending bucketAdds - by design there can only be one
// pending bucket for a group
ConcurrentHashMap<Integer, NextObjective> pendingBuckets = new ConcurrentHashMap<>();
// index number for group creation
AtomicInteger l3vpnindex = new AtomicInteger(0);
......@@ -202,7 +204,6 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
flowRuleService = serviceDirectory.get(FlowRuleService.class);
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
packetService = serviceDirectory.get(PacketService.class);
deviceService = serviceDirectory.get(DeviceService.class);
groupService.addListener(new InnerGroupListener());
......@@ -293,10 +294,13 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
if (nextGroup != null) {
log.debug("Processing NextObjective id{} in dev{} - add bucket",
nextObjective.id(), deviceId);
addBucketToGroup(nextObjective);
addBucketToGroup(nextObjective, nextGroup);
} else {
// it is possible that group-chain has not been fully created yet
waitToAddBucketToGroup(nextObjective);
log.debug("Waiting to add bucket to group for next-id:{} in dev:{}",
nextObjective.id(), deviceId);
// by design only one pending bucket is allowed for the group
pendingBuckets.put(nextObjective.id(), nextObjective);
}
break;
case REMOVE:
......@@ -307,7 +311,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
log.debug("Processing NextObjective id{} in dev{} - remove group",
nextObjective.id(), deviceId);
removeGroup(nextObjective);
removeGroup(nextObjective, nextGroup);
break;
case REMOVE_FROM_EXISTING:
if (nextGroup == null) {
......@@ -317,7 +321,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
log.debug("Processing NextObjective id{} in dev{} - remove bucket",
nextObjective.id(), deviceId);
removeBucketFromGroup(nextObjective);
removeBucketFromGroup(nextObjective, nextGroup);
break;
default:
log.warn("Unsupported operation {}", nextObjective.op());
......@@ -791,7 +795,8 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
return Collections.emptySet();
}
NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
NextGroup next = getGroupForNextObjective(fwd.nextId());
if (next != null) {
List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
// we only need the top level group's key to point the flow to it
Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
......@@ -803,6 +808,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
tb.deferred().group(group.id());
}
}
tb.transition(ACL_TABLE);
FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
.fromApp(fwd.appId())
......@@ -868,7 +874,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
if (fwd.nextId() != null) {
NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
NextGroup next = getGroupForNextObjective(fwd.nextId());
if (next != null) {
List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
// we only need the top level group's key to point the flow to it
......@@ -903,6 +909,23 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
return rules;
}
protected NextGroup getGroupForNextObjective(Integer nextId) {
NextGroup next = flowObjectiveStore.getNextGroup(nextId);
if (next != null) {
List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
if (gkeys != null && !gkeys.isEmpty()) {
return next;
} else {
log.warn("Empty next group found in FlowObjective store for "
+ "next-id:{} in dev:{}", nextId, deviceId);
}
} else {
log.warn("next-id {} not found in Flow objective store for dev:{}",
nextId, deviceId);
}
return null;
}
private void pass(Objective obj) {
if (obj.context().isPresent()) {
obj.context().get().onSuccess(obj);
......@@ -1013,6 +1036,16 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
}
private void updatePendingGroups(GroupKey gkey, GroupChainElem gce) {
Set<GroupChainElem> gceSet = Collections.newSetFromMap(
new ConcurrentHashMap<GroupChainElem, Boolean>());
gceSet.add(gce);
Set<GroupChainElem> retval = pendingGroups.putIfAbsent(gkey, gceSet);
if (retval != null) {
retval.add(gce);
}
}
/**
* Creates a simple L2 Interface Group.
*
......@@ -1242,14 +1275,8 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
// store l2groupkey with the groupChainElem for the outer-group that depends on it
GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1);
Set<GroupChainElem> gceSet = Collections.newSetFromMap(
new ConcurrentHashMap<GroupChainElem, Boolean>());
gceSet.add(gce);
Set<GroupChainElem> retval = pendingGroups.putIfAbsent(l2groupkey, gceSet);
if (retval != null) {
retval.add(gce);
}
GroupChainElem gce = new GroupChainElem(outerGrpDesc, 1, false);
updatePendingGroups(l2groupkey, gce);
// create group description for the inner l2interfacegroup
GroupBucket l2interfaceGroupBucket =
......@@ -1376,7 +1403,8 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
l2floodgroupId,
nextObj.appId());
GroupChainElem gce = new GroupChainElem(l2floodGroupDescription,
l2interfaceGroupDescs.size());
l2interfaceGroupDescs.size(),
false);
log.debug("Trying L2-Flood: device:{} gid:{} gkey:{} nextid:{}",
deviceId, Integer.toHexString(l2floodgroupId),
l2floodgroupkey, nextObj.id());
......@@ -1392,16 +1420,8 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
for (GroupDescription l2intGrpDesc : l2interfaceGroupDescs) {
// store all l2groupkeys with the groupChainElem for the l2floodgroup
// that depends on it
Set<GroupChainElem> gceSet = Collections.newSetFromMap(
new ConcurrentHashMap<GroupChainElem, Boolean>());
gceSet.add(gce);
Set<GroupChainElem> retval = pendingGroups.putIfAbsent(
l2intGrpDesc.appCookie(), gceSet);
if (retval != null) {
retval.add(gce);
}
// create and send groups for all l2 interface groups
updatePendingGroups(l2intGrpDesc.appCookie(), gce);
// send groups for all l2 interface groups
groupService.addGroup(l2intGrpDesc);
}
}
......@@ -1430,17 +1450,81 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* <p>
* NOTE: We do not create MPLS ECMP groups as they are unimplemented in
* OF-DPA 2.0 (even though it is in the spec). Therefore we do not
* check the nextObjective meta.
* check the nextObjective meta to see what is matching before being
* sent to this nextObjective.
*
* @param nextObj the nextObjective of type HASHED
*/
private void processHashedNextObjective(NextObjective nextObj) {
// break up hashed next objective to multiple groups
Collection<TrafficTreatment> buckets = nextObj.next();
// storage for all group keys in the chain of groups created
List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
List<GroupInfo> unsentGroups = new ArrayList<>();
createHashBucketChains(nextObj, allGroupKeys, unsentGroups);
// now we can create the outermost L3 ECMP group
List<GroupBucket> l3ecmpGroupBuckets = new ArrayList<>();
for (GroupInfo gi : unsentGroups) {
// create ECMP bucket to point to the outer group
TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
GroupBucket sbucket = DefaultGroupBucket
.createSelectGroupBucket(ttb.build());
l3ecmpGroupBuckets.add(sbucket);
}
int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12;
GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId));
GroupDescription l3ecmpGroupDesc =
new DefaultGroupDescription(
deviceId,
GroupDescription.Type.SELECT,
new GroupBuckets(l3ecmpGroupBuckets),
l3ecmpGroupKey,
l3ecmpGroupId,
nextObj.appId());
GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc,
l3ecmpGroupBuckets.size(),
false);
// create objects for local and distributed storage
allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey));
OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj);
// store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective
// that depends on it
updatePendingNextObjective(l3ecmpGroupKey, ofdpaGrp);
log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
deviceId, Integer.toHexString(l3ecmpGroupId),
l3ecmpGroupKey, nextObj.id());
// finally we are ready to send the innermost groups
for (GroupInfo gi : unsentGroups) {
log.debug("Sending innermost group {} in group chain on device {} ",
Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
updatePendingGroups(gi.outerGrpDesc.appCookie(), l3ecmpGce);
groupService.addGroup(gi.innerGrpDesc);
}
}
/**
* Creates group chains for all buckets in a hashed group, and stores the
* GroupInfos and GroupKeys for all the groups in the lists passed in, which
* should be empty.
* <p>
* Does not create the top level ECMP group. Does not actually send the
* groups to the groupService.
*
* @param nextObj the Next Objective with buckets that need to be converted
* to group chains
* @param allGroupKeys a list to store groupKey for each bucket-group-chain
* @param unsentGroups a list to store GroupInfo for each bucket-group-chain
*/
private void createHashBucketChains(NextObjective nextObj,
List<Deque<GroupKey>> allGroupKeys,
List<GroupInfo> unsentGroups) {
// break up hashed next objective to multiple groups
Collection<TrafficTreatment> buckets = nextObj.next();
for (TrafficTreatment bucket : buckets) {
//figure out how many labels are pushed in each bucket
int labelsPushed = 0;
......@@ -1508,15 +1592,8 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
l3vpngroupkey,
l3vpngroupId,
nextObj.appId());
GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1);
Set<GroupChainElem> gceSet = Collections.newSetFromMap(
new ConcurrentHashMap<GroupChainElem, Boolean>());
gceSet.add(l3vpnGce);
Set<GroupChainElem> retval = pendingGroups
.putIfAbsent(onelabelGroupInfo.outerGrpDesc.appCookie(), gceSet);
if (retval != null) {
retval.add(l3vpnGce);
}
GroupChainElem l3vpnGce = new GroupChainElem(l3vpnGroupDesc, 1, false);
updatePendingGroups(onelabelGroupInfo.outerGrpDesc.appCookie(), l3vpnGce);
gkeyChain.addFirst(onelabelGroupInfo.innerGrpDesc.appCookie());
gkeyChain.addFirst(onelabelGroupInfo.outerGrpDesc.appCookie());
......@@ -1535,80 +1612,186 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
} else {
log.warn("Driver currently does not handle more than 1 MPLS "
+ "labels. Not processing nextObjective {}", nextObj);
+ "labels. Not processing nextObjective {}", nextObj.id());
return;
}
// all groups in this chain
allGroupKeys.add(gkeyChain);
}
}
// now we can create the outermost L3 ECMP group
List<GroupBucket> l3ecmpGroupBuckets = new ArrayList<>();
for (GroupInfo gi : unsentGroups) {
// create ECMP bucket to point to the outer group
/**
* Adds a bucket to the top level group of a group-chain, and creates the chain.
*
* @param nextObjective the next group to add a bucket to
* @param next the representation of the existing group-chain for this next objective
*/
private void addBucketToGroup(NextObjective nextObjective, NextGroup next) {
if (nextObjective.type() != NextObjective.Type.HASHED) {
log.warn("AddBuckets not applied to nextType:{} in dev:{} for next:{}",
nextObjective.type(), deviceId, nextObjective.id());
return;
}
if (nextObjective.next().size() > 1) {
log.warn("Only one bucket can be added at a time");
return;
}
// storage for all group keys in the chain of groups created
List<Deque<GroupKey>> allGroupKeys = new ArrayList<>();
List<GroupInfo> unsentGroups = new ArrayList<>();
createHashBucketChains(nextObjective, allGroupKeys, unsentGroups);
// now we can create the outermost L3 ECMP group bucket to add
GroupInfo gi = unsentGroups.get(0); // only one bucket, so only one group-chain
TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
ttb.group(new DefaultGroupId(gi.outerGrpDesc.givenGroupId()));
GroupBucket sbucket = DefaultGroupBucket
.createSelectGroupBucket(ttb.build());
l3ecmpGroupBuckets.add(sbucket);
}
int l3ecmpGroupId = L3ECMPMASK | nextObj.id() << 12;
GroupBucket sbucket = DefaultGroupBucket.createSelectGroupBucket(ttb.build());
// recreate the original L3 ECMP group id and description
int l3ecmpGroupId = L3ECMPMASK | nextObjective.id() << 12;
GroupKey l3ecmpGroupKey = new DefaultGroupKey(appKryo.serialize(l3ecmpGroupId));
// Although GroupDescriptions are not necessary for adding buckets to
// existing groups, we use one in the GroupChainElem. When the latter is
// processed, the info will be extracted for the bucketAdd call to groupService
GroupDescription l3ecmpGroupDesc =
new DefaultGroupDescription(
deviceId,
GroupDescription.Type.SELECT,
new GroupBuckets(l3ecmpGroupBuckets),
new GroupBuckets(Collections.singletonList(sbucket)),
l3ecmpGroupKey,
l3ecmpGroupId,
nextObj.appId());
GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc,
l3ecmpGroupBuckets.size());
// create objects for local and distributed storage
allGroupKeys.forEach(gkeyChain -> gkeyChain.addFirst(l3ecmpGroupKey));
OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(allGroupKeys, nextObj);
// store l3ecmpGroupKey with the ofdpaGroupChain for the nextObjective
// that depends on it
updatePendingNextObjective(l3ecmpGroupKey, ofdpaGrp);
log.debug("Trying L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
nextObjective.appId());
GroupChainElem l3ecmpGce = new GroupChainElem(l3ecmpGroupDesc, 1, true);
// update original NextGroup with new bucket-chain
// don't need to update pendingNextObjectives -- group already exists
Deque<GroupKey> newBucketChain = allGroupKeys.get(0);
newBucketChain.addFirst(l3ecmpGroupKey);
List<Deque<GroupKey>> allOriginalKeys = appKryo.deserialize(next.data());
allOriginalKeys.add(newBucketChain);
flowObjectiveStore.putNextGroup(nextObjective.id(),
new OfdpaNextGroup(allOriginalKeys, nextObjective));
log.debug("Adding to L3ECMP: device:{} gid:{} gkey:{} nextId:{}",
deviceId, Integer.toHexString(l3ecmpGroupId),
l3ecmpGroupKey, nextObj.id());
// finally we are ready to send the innermost groups
for (GroupInfo gi : unsentGroups) {
l3ecmpGroupKey, nextObjective.id());
// send the innermost group
log.debug("Sending innermost group {} in group chain on device {} ",
Integer.toHexString(gi.innerGrpDesc.givenGroupId()), deviceId);
Set<GroupChainElem> gceSet = Collections.newSetFromMap(
new ConcurrentHashMap<GroupChainElem, Boolean>());
gceSet.add(l3ecmpGce);
Set<GroupChainElem> retval = pendingGroups
.putIfAbsent(gi.outerGrpDesc.appCookie(), gceSet);
if (retval != null) {
retval.add(l3ecmpGce);
}
updatePendingGroups(gi.outerGrpDesc.appCookie(), l3ecmpGce);
groupService.addGroup(gi.innerGrpDesc);
}
}
private void addBucketToGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
/**
* Removes the bucket in the top level group of a possible group-chain. Does
* not remove the groups in a group-chain pointed to by this bucket, as they
* may be in use (referenced by other groups) elsewhere.
*
* @param nextObjective the next group to remove a bucket from
* @param next the representation of the existing group-chain for this next objective
*/
private void removeBucketFromGroup(NextObjective nextObjective, NextGroup next) {
if (nextObjective.type() != NextObjective.Type.HASHED) {
log.warn("RemoveBuckets not applied to nextType:{} in dev:{} for next:{}",
nextObjective.type(), deviceId, nextObjective.id());
return;
}
private void waitToAddBucketToGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
Collection<TrafficTreatment> treatments = nextObjective.next();
TrafficTreatment treatment = treatments.iterator().next();
// find the bucket to remove by noting the outport, and figuring out the
// top-level group in the group-chain that indirectly references the port
PortNumber outport = null;
for (Instruction ins : treatment.allInstructions()) {
if (ins instanceof OutputInstruction) {
outport = ((OutputInstruction) ins).port();
break;
}
}
if (outport == null) {
log.error("next objective {} has no outport", nextObjective.id());
return;
}
private void removeBucketFromGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
List<Deque<GroupKey>> allgkeys = appKryo.deserialize(next.data());
Deque<GroupKey> foundChain = null;
int index = 0;
for (Deque<GroupKey> gkeys : allgkeys) {
GroupKey groupWithPort = gkeys.peekLast();
Group group = groupService.getGroup(deviceId, groupWithPort);
if (group == null) {
log.warn("Inconsistent group chain");
continue;
}
// last group in group chain should have a single bucket pointing to port
List<Instruction> lastIns = group.buckets().buckets().iterator()
.next().treatment().allInstructions();
for (Instruction i : lastIns) {
if (i instanceof OutputInstruction) {
PortNumber lastport = ((OutputInstruction) i).port();
if (lastport.equals(outport)) {
foundChain = gkeys;
break;
}
}
}
if (foundChain != null) {
break;
}
index++;
}
if (foundChain != null) {
//first groupkey is the one we want to modify
GroupKey modGroupKey = foundChain.peekFirst();
Group modGroup = groupService.getGroup(deviceId, modGroupKey);
//second groupkey is the one we wish to remove the reference to
GroupKey pointedGroupKey = null;
int i = 0;
for (GroupKey gk : foundChain) {
if (i++ == 1) {
pointedGroupKey = gk;
break;
}
}
Group pointedGroup = groupService.getGroup(deviceId, pointedGroupKey);
GroupBucket bucket = DefaultGroupBucket.createSelectGroupBucket(
DefaultTrafficTreatment.builder()
.group(pointedGroup.id())
.build());
GroupBuckets removeBuckets = new GroupBuckets(Collections
.singletonList(bucket));
log.debug("Removing buckets from group id {} for next id {} in device {}",
modGroup.id(), nextObjective.id(), deviceId);
groupService.removeBucketsFromGroup(deviceId, modGroupKey,
removeBuckets, modGroupKey,
nextObjective.appId());
//update store
allgkeys.remove(index);
flowObjectiveStore.putNextGroup(nextObjective.id(),
new OfdpaNextGroup(allgkeys, nextObjective));
} else {
log.warn("Could not find appropriate group-chain for removing bucket"
+ " for next id {} in dev:{}", nextObjective.id(), deviceId);
}
}
private void removeGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
/**
* Removes all groups in multiple possible group-chains that represent the next
* objective.
*
* @param nextObjective the next objective to remove
* @param next the NextGroup that represents the existing group-chain for
* this next objective
*/
private void removeGroup(NextObjective nextObjective, NextGroup next) {
List<Deque<GroupKey>> allgkeys = appKryo.deserialize(next.data());
allgkeys.forEach(groupChain -> {
groupChain.forEach(groupKey ->
groupService.removeGroup(deviceId, groupKey, nextObjective.appId()));
});
flowObjectiveStore.removeNextGroup(nextObjective.id());
}
/**
......@@ -1617,7 +1800,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* and this driver has received notification for it. A second assumption is
* that if there is another group waiting for this group then the appropriate
* stores already have the information to act upon the notification for the
* creating of this group.
* creation of this group.
* <p>
* The processing of the GroupChainElement depends on the number of groups
* this element is waiting on. For all group types other than SIMPLE, a
......@@ -1632,8 +1815,16 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
return;
}
log.debug("GCE: {} ready to be processed", gce);
if (gce.addBucketToGroup) {
groupService.addBucketsToGroup(gce.groupDescription.deviceId(),
gce.groupDescription.appCookie(),
gce.groupDescription.buckets(),
gce.groupDescription.appCookie(),
gce.groupDescription.appId());
} else {
groupService.addGroup(gce.groupDescription);
}
}
private class GroupChecker implements Runnable {
@Override
......@@ -1646,35 +1837,47 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
.collect(Collectors.toSet());
keys.addAll(otherkeys);
keys.stream().forEach(key -> {
keys.stream().forEach(key ->
processPendingGroupsOrNextObjectives(key, false));
}
}
private void processPendingGroupsOrNextObjectives(GroupKey key, boolean added) {
//first check for group chain
Set<GroupChainElem> gceSet = pendingGroups.remove(key);
if (gceSet != null) {
for (GroupChainElem gce : gceSet) {
log.info("Group service processed group key {} in device {}. "
log.info("Group service {} group key {} in device {}. "
+ "Processing next group in group chain with group id {}",
(added) ? "ADDED" : "processed",
key, deviceId,
Integer.toHexString(gce.groupDescription.givenGroupId()));
processGroupChain(gce);
}
} else {
List<OfdpaNextGroup> objList = pendingNextObjectives.getIfPresent(key);
if (objList != null) {
// otherwise chain complete - check for waiting nextObjectives
List<OfdpaNextGroup> nextGrpList = pendingNextObjectives.getIfPresent(key);
if (nextGrpList != null) {
pendingNextObjectives.invalidate(key);
objList.forEach(obj -> {
log.info("Group service processed group key {} in device:{}. "
nextGrpList.forEach(nextGrp -> {
log.info("Group service {} group key {} in device:{}. "
+ "Done implementing next objective: {} <<-->> gid:{}",
key, deviceId, obj.nextObjective().id(),
(added) ? "ADDED" : "processed",
key, deviceId, nextGrp.nextObjective().id(),
Integer.toHexString(groupService.getGroup(deviceId, key)
.givenGroupId()));
pass(obj.nextObjective());
flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj);
});
}
pass(nextGrp.nextObjective());
flowObjectiveStore.putNextGroup(nextGrp.nextObjective().id(), nextGrp);
// check if addBuckets waiting for this completion
NextObjective pendBkt = pendingBuckets
.remove(nextGrp.nextObjective().id());
if (pendBkt != null) {
addBucketToGroup(pendBkt, nextGrp);
}
});
}
}
}
private class InnerGroupListener implements GroupListener {
@Override
......@@ -1682,31 +1885,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
log.trace("received group event of type {}", event.type());
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
GroupKey key = event.subject().appCookie();
// first check for group chain
Set<GroupChainElem> gceSet = pendingGroups.remove(key);
if (gceSet != null) {
for (GroupChainElem gce : gceSet) {
log.info("group ADDED with group key {} .. "
+ "Processing next group in group chain with group key {}",
key,
gce.groupDescription.appCookie());
processGroupChain(gce);
}
} else {
List<OfdpaNextGroup> objList = pendingNextObjectives.getIfPresent(key);
if (objList != null) {
pendingNextObjectives.invalidate(key);
objList.forEach(obj -> {
log.info("group ADDED with key {} in dev {}.. Done implementing next "
+ "objective: {} <<-->> gid:{}",
key, deviceId, obj.nextObjective().id(),
Integer.toHexString(groupService.getGroup(deviceId, key)
.givenGroupId()));
pass(obj.nextObjective());
flowObjectiveStore.putNextGroup(obj.nextObjective().id(), obj);
});
}
}
processPendingGroupsOrNextObjectives(key, true);
}
}
}
......@@ -1714,7 +1893,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
/**
* Represents an entire group-chain that implements a Next-Objective from
* the application. The objective is represented as a list of deques, where
* each deque can is a separate chain of groups.
* each deque is a separate chain of groups.
* <p>
* For example, an ECMP group with 3 buckets, where each bucket points to
* a group chain of L3 Unicast and L2 interface groups will look like this:
......@@ -1765,10 +1944,13 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
private class GroupChainElem {
private GroupDescription groupDescription;
private AtomicInteger waitOnGroups;
private boolean addBucketToGroup;
GroupChainElem(GroupDescription groupDescription, int waitOnGroups) {
GroupChainElem(GroupDescription groupDescription, int waitOnGroups,
boolean addBucketToGroup) {
this.groupDescription = groupDescription;
this.waitOnGroups = new AtomicInteger(waitOnGroups);
this.addBucketToGroup = addBucketToGroup;
}
/**
......@@ -1788,6 +1970,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
return (Integer.toHexString(groupDescription.givenGroupId()) +
" groupKey: " + groupDescription.appCookie() +
" waiting-on-groups: " + waitOnGroups.get() +
" addBucketToGroup: " + addBucketToGroup +
" device: " + deviceId);
}
}
......