Srikanth Vavilapalli
Committed by Gerrit Code Review

Distributed group store using eventual consistent map abstraction

Change-Id: I618a0f6fa80e0e25285d7a2026032f09ba90aa70
Showing 25 changed files with 1140 additions and 311 deletions
......@@ -15,11 +15,14 @@
*/
package org.onosproject.bgprouter;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -30,6 +33,7 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.config.NetworkConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
......@@ -47,12 +51,12 @@ import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupListener;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.host.InterfaceIpAddress;
......@@ -67,13 +71,11 @@ import org.onosproject.routing.config.RoutingConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.ConcurrentHashMultiset;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
/**
* BgpRouter component.
......@@ -126,7 +128,7 @@ public class BgpRouter {
private final Map<IpAddress, NextHop> nextHops = Maps.newHashMap();
// Stores FIB updates that are waiting for groups to be set up
private final Multimap<GroupKey, FibEntry> pendingUpdates = HashMultimap.create();
private final Multimap<NextHopGroupKey, FibEntry> pendingUpdates = HashMultimap.create();
// Device id of data-plane switch - should be learned from config
private DeviceId deviceId;
......@@ -143,6 +145,11 @@ public class BgpRouter {
private InternalTableHandler provisionStaticTables = new InternalTableHandler();
private KryoNamespace.Builder appKryo = new KryoNamespace.Builder()
.register(IpAddress.Version.class)
.register(IpAddress.class)
.register(NextHopGroupKey.class);
@Activate
protected void activate() {
appId = coreService.registerApplication(BGP_ROUTER_APP);
......@@ -210,7 +217,9 @@ public class BgpRouter {
Group group;
synchronized (pendingUpdates) {
NextHop nextHop = nextHops.get(entry.nextHopIp());
group = groupService.getGroup(deviceId, nextHop.group());
group = groupService.getGroup(deviceId,
new DefaultGroupKey(
appKryo.build().serialize(nextHop.group())));
if (group == null) {
log.debug("Adding pending flow {}", update.entry());
......@@ -309,7 +318,7 @@ public class BgpRouter {
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections
.singletonList(bucket)),
groupKey,
new DefaultGroupKey(appKryo.build().serialize(groupKey)),
appId);
groupService.addGroup(groupDescription);
......@@ -329,7 +338,10 @@ public class BgpRouter {
return null;
}
Group group = groupService.getGroup(deviceId, nextHop.group());
Group group = groupService.getGroup(deviceId,
new DefaultGroupKey(appKryo.
build().
serialize(nextHop.group())));
// FIXME disabling group deletes for now until we verify the logic is OK
/*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
......@@ -339,7 +351,9 @@ public class BgpRouter {
nextHops.remove(nextHopIp);
groupService.removeGroup(deviceId, nextHop.group(), appId);
groupService.removeGroup(deviceId,
new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
appId);
}*/
return group;
......@@ -699,8 +713,10 @@ public class BgpRouter {
event.type() == GroupEvent.Type.GROUP_UPDATED) {
synchronized (pendingUpdates) {
NextHopGroupKey nhGroupKey =
appKryo.build().deserialize(group.appCookie().key());
Map<FibEntry, Group> entriesToInstall =
pendingUpdates.removeAll(group.appCookie())
pendingUpdates.removeAll(nhGroupKey)
.stream()
.collect(Collectors
.toMap(e -> e, e -> group));
......
......@@ -15,12 +15,12 @@
*/
package org.onosproject.bgprouter;
import com.google.common.base.MoreObjects;
import java.util.Objects;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onosproject.net.group.GroupKey;
import java.util.Objects;
import com.google.common.base.MoreObjects;
/**
* Represents a next hop for routing, whose MAC address has already been resolved.
......@@ -29,7 +29,7 @@ public class NextHop {
private final IpAddress ip;
private final MacAddress mac;
private final GroupKey group;
private final NextHopGroupKey group;
/**
* Creates a new next hop.
......@@ -38,7 +38,7 @@ public class NextHop {
* @param mac next hop's MAC address
* @param group next hop's group
*/
public NextHop(IpAddress ip, MacAddress mac, GroupKey group) {
public NextHop(IpAddress ip, MacAddress mac, NextHopGroupKey group) {
this.ip = ip;
this.mac = mac;
this.group = group;
......@@ -67,7 +67,7 @@ public class NextHop {
*
* @return group
*/
public GroupKey group() {
public NextHopGroupKey group() {
return group;
}
......
......@@ -15,18 +15,18 @@
*/
package org.onosproject.bgprouter;
import com.google.common.base.MoreObjects;
import org.onlab.packet.IpAddress;
import org.onosproject.net.group.GroupKey;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.packet.IpAddress;
import com.google.common.base.MoreObjects;
/**
* Identifier for a next hop group.
*/
public class NextHopGroupKey implements GroupKey {
public class NextHopGroupKey {
private final IpAddress address;
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-apps</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
......
......@@ -124,16 +124,23 @@ public class DefaultEdgeGroupHandler extends DefaultGroupHandler {
tBuilder.setOutput(newNeighborLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newNeighborLink.dst().deviceId()))
.setEthSrc(nodeMacAddr)
.pushMpls()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
.setEthSrc(nodeMacAddr);
if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.
mplsLabel(ns.getEdgeLabel()));
}
GroupBucket updatedBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets updatedBuckets = new GroupBuckets(
Arrays.asList(updatedBucket));
log.debug("newPortToExistingNeighborAtEdgeRouter: "
+ "groupService.addBucketsToGroup for neighborset{}", ns);
groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
groupService.addBucketsToGroup(deviceId,
getGroupKey(ns),
updatedBuckets,
getGroupKey(ns),
appId);
}
}
......
......@@ -18,6 +18,7 @@ package org.onosproject.grouphandler;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
......@@ -27,6 +28,7 @@ import java.util.Set;
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
......@@ -35,6 +37,7 @@ import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
......@@ -71,6 +74,16 @@ public class DefaultGroupHandler {
new HashMap<PortNumber, DeviceId>();
private GroupListener listener = new InternalGroupListener();
protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
.register(URI.class)
.register(HashSet.class)
.register(DeviceId.class)
.register(PortNumber.class)
.register(NeighborSet.class)
.register(PolicyGroupIdentifier.class)
.register(PolicyGroupParams.class)
.register(GroupBucketIdentifier.class)
.register(GroupBucketIdentifier.BucketOutputType.class);
protected DefaultGroupHandler(DeviceId deviceId,
ApplicationId appId,
......@@ -185,9 +198,11 @@ public class DefaultGroupHandler {
tBuilder.setOutput(port)
.setEthDst(deviceConfig.getDeviceMac(
portDeviceMap.get(port)))
.setEthSrc(nodeMacAddr)
.pushMpls()
.setEthSrc(nodeMacAddr);
if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
}
GroupBucket removeBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets removeBuckets = new GroupBuckets(
......@@ -196,9 +211,9 @@ public class DefaultGroupHandler {
+ "groupService.removeBucketsFromGroup "
+ "for neighborset{}", deviceId, ns);
groupService.removeBucketsFromGroup(deviceId,
ns,
getGroupKey(ns),
removeBuckets,
ns,
getGroupKey(ns),
appId);
}
......@@ -331,9 +346,12 @@ public class DefaultGroupHandler {
DefaultTrafficTreatment.builder();
tBuilder.setOutput(sp)
.setEthDst(deviceConfig.getDeviceMac(d))
.setEthSrc(nodeMacAddr)
.pushMpls()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
.setEthSrc(nodeMacAddr);
if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.
mplsLabel(ns.getEdgeLabel()));
}
buckets.add(DefaultGroupBucket.createSelectGroupBucket(
tBuilder.build()));
}
......@@ -343,7 +361,7 @@ public class DefaultGroupHandler {
deviceId,
Group.Type.SELECT,
groupBuckets,
ns,
getGroupKey(ns),
appId);
log.debug("createGroupsFromNeighborsets: "
+ "groupService.addGroup for neighborset{}", ns);
......@@ -386,4 +404,8 @@ public class DefaultGroupHandler {
handleGroupEvent(event);
}
}
protected GroupKey getGroupKey(Object obj) {
return new DefaultGroupKey(kryo.build().serialize(obj));
}
}
......
......@@ -17,8 +17,10 @@ package org.onosproject.grouphandler;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
......@@ -27,10 +29,13 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.MacAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
......@@ -69,10 +74,18 @@ public class DefaultGroupHandlerApp {
protected GroupService groupService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
.register(URI.class)
.register(HashSet.class)
.register(DeviceId.class)
.register(NeighborSet.class);
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.defaultgrouphandler");
......@@ -80,14 +93,23 @@ public class DefaultGroupHandlerApp {
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
for (Device device: deviceService.getDevices()) {
log.debug("Initiating default group handling for {}", device.id());
DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
appId,
config,
linkService,
groupService);
dgh.createGroups();
dghMap.put(device.id(), dgh);
if (mastershipService.
getLocalRole(device.id()) == MastershipRole.MASTER) {
log.debug("Initiating default group handling for {}", device.id());
DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device.id(),
appId,
config,
linkService,
groupService);
dgh.createGroups();
dghMap.put(device.id(), dgh);
} else {
log.debug("Activate: Local role {} "
+ "is not MASTER for device {}",
mastershipService.
getLocalRole(device.id()),
device.id());
}
}
log.info("Activated");
}
......@@ -165,7 +187,15 @@ public class DefaultGroupHandlerApp {
@Override
public void event(DeviceEvent event) {
switch (event.type()) {
if (mastershipService.
getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
log.debug("Local role {} is not MASTER for device {}",
mastershipService.
getLocalRole(event.subject().id()),
event.subject().id());
return;
}
switch (event.type()) {
case DEVICE_ADDED:
log.debug("Initiating default group handling for {}", event.subject().id());
DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(
......@@ -193,6 +223,16 @@ public class DefaultGroupHandlerApp {
@Override
public void event(LinkEvent event) {
if (mastershipService.
getLocalRole(event.subject().src().deviceId()) !=
MastershipRole.MASTER) {
log.debug("InternalLinkListener: Local role {} "
+ "is not MASTER for device {}",
mastershipService.
getLocalRole(event.subject().src().deviceId()),
event.subject().src().deviceId());
return;
}
switch (event.type()) {
case LINK_ADDED:
if (dghMap.get(event.subject().src().deviceId()) != null) {
......
......@@ -112,16 +112,23 @@ public class DefaultTransitGroupHandler extends DefaultGroupHandler {
tBuilder.setOutput(newNeighborLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newNeighborLink.dst().deviceId()))
.setEthSrc(nodeMacAddr)
.pushMpls()
.setMpls(MplsLabel.mplsLabel(ns.getEdgeLabel()));
.setEthSrc(nodeMacAddr);
if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.
mplsLabel(ns.getEdgeLabel()));
}
GroupBucket updatedBucket = DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build());
GroupBuckets updatedBuckets = new GroupBuckets(
Arrays.asList(updatedBucket));
log.debug("newPortToExistingNeighborAtEdgeRouter: "
+ "groupService.addBucketsToGroup for neighborset{}", ns);
groupService.addBucketsToGroup(deviceId, ns, updatedBuckets, ns, appId);
groupService.addBucketsToGroup(deviceId,
getGroupKey(ns),
updatedBuckets,
getGroupKey(ns),
appId);
}
}
......
......@@ -18,7 +18,6 @@ package org.onosproject.grouphandler;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onosproject.net.PortNumber;
import org.onosproject.net.group.GroupKey;
/**
* Representation of policy group bucket identifier. Not exposed to
......@@ -28,7 +27,7 @@ public class GroupBucketIdentifier {
private int label;
private BucketOutputType type;
private PortNumber outPort;
private GroupKey outGroup;
private PolicyGroupIdentifier outGroup;
protected enum BucketOutputType {
PORT,
......@@ -44,7 +43,7 @@ public class GroupBucketIdentifier {
}
protected GroupBucketIdentifier(int label,
GroupKey outGroup) {
PolicyGroupIdentifier outGroup) {
this.label = label;
this.type = BucketOutputType.GROUP;
this.outPort = null;
......@@ -63,7 +62,7 @@ public class GroupBucketIdentifier {
return this.outPort;
}
protected GroupKey outGroup() {
protected PolicyGroupIdentifier outGroup() {
return this.outGroup;
}
}
......
......@@ -23,7 +23,6 @@ import java.util.Objects;
import java.util.Set;
import org.onosproject.net.DeviceId;
import org.onosproject.net.group.GroupKey;
/**
* Representation of a set of neighbor switch dpids along with edge node
......@@ -31,9 +30,10 @@ import org.onosproject.net.group.GroupKey;
* ECMP-group that hashes packets to a set of ports connecting to the
* neighbors in this set.
*/
public class NeighborSet implements GroupKey {
public class NeighborSet {
private final Set<DeviceId> neighbors;
private final int edgeLabel;
public static final int NO_EDGE_LABEL = -1;
/**
* Constructor with set of neighbors. Edge label is
......@@ -43,7 +43,7 @@ public class NeighborSet implements GroupKey {
*/
public NeighborSet(Set<DeviceId> neighbors) {
checkNotNull(neighbors);
this.edgeLabel = -1;
this.edgeLabel = NO_EDGE_LABEL;
this.neighbors = new HashSet<DeviceId>();
this.neighbors.addAll(neighbors);
}
......@@ -65,7 +65,7 @@ public class NeighborSet implements GroupKey {
* Default constructor for kryo serialization.
*/
public NeighborSet() {
this.edgeLabel = -1;
this.edgeLabel = NO_EDGE_LABEL;
this.neighbors = new HashSet<DeviceId>();
}
......
......@@ -37,7 +37,6 @@ import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.link.LinkService;
import org.slf4j.Logger;
......@@ -49,18 +48,17 @@ import org.slf4j.Logger;
public class PolicyGroupHandler extends DefaultGroupHandler {
private final Logger log = getLogger(getClass());
private HashMap<GroupKey, GroupKey> dependentGroups =
new HashMap<GroupKey, GroupKey>();
private HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier> dependentGroups =
new HashMap<PolicyGroupIdentifier, PolicyGroupIdentifier>();
/**
* Creates policy group handler object.
* Policy group handler constructor.
*
* @param deviceId device identifier
* @param appId application identifier
* @param config interface to retrieve the device properties
* @param linkService link service object
* @param groupService group service object
* @return policy group handler type
*/
public PolicyGroupHandler(DeviceId deviceId,
ApplicationId appId,
......@@ -175,9 +173,11 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
tBuilder.setOutput(bucketId.outPort())
.setEthDst(deviceConfig.
getDeviceMac(neighbor))
.setEthSrc(nodeMacAddr)
.pushMpls()
.setEthSrc(nodeMacAddr);
if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(bucketId.label()));
}
//TODO: BoS
outBuckets.add(DefaultGroupBucket.
createSelectGroupBucket(tBuilder.build()));
......@@ -196,8 +196,7 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
protected void handleGroupEvent(GroupEvent event) {
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
if (dependentGroups.get(event.subject().appCookie()) != null) {
PolicyGroupIdentifier dependentGroupKey = (PolicyGroupIdentifier)
dependentGroups.get(event.subject().appCookie());
PolicyGroupIdentifier dependentGroupKey = dependentGroups.get(event.subject().appCookie());
dependentGroups.remove(event.subject().appCookie());
boolean fullyResolved = true;
for (GroupBucketIdentifier bucketId:
......@@ -217,8 +216,11 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
dependentGroupKey.bucketIds()) {
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
tBuilder.pushMpls()
.setMpls(MplsLabel.mplsLabel(bucketId.label()));
if (bucketId.label() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.
mplsLabel(bucketId.label()));
}
//TODO: BoS
if (bucketId.type() == BucketOutputType.PORT) {
DeviceId neighbor = portDeviceMap.
......@@ -230,12 +232,14 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
} else {
if (groupService.
getGroup(deviceId,
bucketId.outGroup()) == null) {
getGroupKey(bucketId.
outGroup())) == null) {
throw new IllegalStateException();
}
GroupId indirectGroupId = groupService.
getGroup(deviceId,
bucketId.outGroup()).id();
getGroupKey(bucketId.
outGroup())).id();
tBuilder.group(indirectGroupId);
}
outBuckets.add(DefaultGroupBucket.
......@@ -251,7 +255,7 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
}
}
public GroupKey generatePolicyGroupKey(String id,
public PolicyGroupIdentifier generatePolicyGroupKey(String id,
List<PolicyGroupParams> params) {
List<GroupBucketIdentifier> bucketIds = new ArrayList<GroupBucketIdentifier>();
for (PolicyGroupParams param: params) {
......@@ -320,25 +324,28 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
return innermostGroupkey;
}
public void removeGroupChain(GroupKey key) {
public void removeGroupChain(PolicyGroupIdentifier key) {
if (!(key instanceof PolicyGroupIdentifier)) {
throw new IllegalArgumentException();
}
List<GroupKey> groupsToBeDeleted = new ArrayList<GroupKey>();
List<PolicyGroupIdentifier> groupsToBeDeleted =
new ArrayList<PolicyGroupIdentifier>();
groupsToBeDeleted.add(key);
Iterator<GroupKey> it = groupsToBeDeleted.iterator();
Iterator<PolicyGroupIdentifier> it =
groupsToBeDeleted.iterator();
while (it.hasNext()) {
PolicyGroupIdentifier innerMostGroupKey =
(PolicyGroupIdentifier) it.next();
PolicyGroupIdentifier innerMostGroupKey = it.next();
for (GroupBucketIdentifier bucketId:
innerMostGroupKey.bucketIds()) {
if (bucketId.type() != BucketOutputType.GROUP) {
groupsToBeDeleted.add(bucketId.outGroup());
}
}
groupService.removeGroup(deviceId, innerMostGroupKey, appId);
groupService.removeGroup(deviceId,
getGroupKey(innerMostGroupKey),
appId);
it.remove();
}
}
......
......@@ -17,14 +17,12 @@ package org.onosproject.grouphandler;
import java.util.List;
import org.onosproject.net.group.GroupKey;
/**
* Representation of policy based group identifiers.
* Opaque to group handler applications and only the outermost
* policy group identifier in a chain is visible to the applications.
*/
public class PolicyGroupIdentifier implements GroupKey {
public class PolicyGroupIdentifier {
private String id;
private List<PolicyGroupParams> inputParams;
private List<GroupBucketIdentifier> bucketIds;
......
......@@ -16,13 +16,11 @@
package org.onosproject.net.group;
import static com.google.common.base.MoreObjects.toStringHelper;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Objects;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
/**
* ONOS implementation of default group that is stored in the system.
......@@ -30,9 +28,8 @@ import org.slf4j.Logger;
public class DefaultGroup extends DefaultGroupDescription
implements Group, StoredGroupEntry {
private final Logger log = getLogger(getClass());
private GroupState state;
private boolean isGroupStateAddedFirstTime;
private long life;
private long packets;
private long bytes;
......@@ -215,4 +212,14 @@ public class DefaultGroup extends DefaultGroupDescription
.add("state", state)
.toString();
}
@Override
public void setIsGroupStateAddedFirstTime(boolean isGroupStateAddedFirstTime) {
this.isGroupStateAddedFirstTime = isGroupStateAddedFirstTime;
}
@Override
public boolean isGroupStateAddedFirstTime() {
return isGroupStateAddedFirstTime;
}
}
......
......@@ -41,7 +41,8 @@ public class DefaultGroupDescription implements GroupDescription {
* @param deviceId device identifier
* @param type type of the group
* @param buckets immutable list of group bucket
* @param appCookie immutable application cookie to be associated with the group
* @param appCookie immutable application cookie of type DefaultGroupKey
* to be associated with the group
* @param appId application id
*/
public DefaultGroupDescription(DeviceId deviceId,
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.group;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
/**
* Default implementation of group key interface.
*/
public class DefaultGroupKey implements GroupKey {
private final byte[] key;
public DefaultGroupKey(byte[] key) {
this.key = checkNotNull(key);
}
@Override
public byte[] key() {
return key;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DefaultGroupKey)) {
return false;
}
DefaultGroupKey that = (DefaultGroupKey) o;
return (Arrays.equals(this.key, that.key));
}
@Override
public int hashCode() {
return Arrays.hashCode(this.key);
}
}
\ No newline at end of file
......@@ -17,8 +17,15 @@ package org.onosproject.net.group;
/**
* Representation of generalized Key that would be used to store
* groups in &lt; Key, Value &gt; store. Implementation of this interface
* MUST override "equals()" and "hashcode()" methods.
* groups in &lt; Key, Value &gt; store. This key uses a generic
* byte array so that applications can associate their groups with
* any of their data by translating it into a byte array.
*/
public interface GroupKey {
/**
* Returns the byte representation of key.
*
* @return byte array
*/
public byte[] key();
}
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.net.group;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
......@@ -60,6 +61,15 @@ public interface GroupStore extends Store<GroupEvent, GroupStoreDelegate> {
Group getGroup(DeviceId deviceId, GroupKey appCookie);
/**
* Returns the stored group entry for an id.
*
* @param deviceId the device ID
* @param groupId the group identifier
* @return a group associated with the key
*/
Group getGroup(DeviceId deviceId, GroupId groupId);
/**
* Stores a new group entry using the information from group description.
*
* @param groupDesc group description to be used to store group entry
......
......@@ -29,6 +29,23 @@ public interface StoredGroupEntry extends Group {
void setState(Group.GroupState newState);
/**
* Sets if group has transitioned to ADDED state for the first time.
* This is to differentiate state transitions "from PENDING_ADD to ADDED"
* and "from PENDING_UPDATE to ADDED". For internal use only.
*
* @param isGroupAddedFirstTime true if group moves to ADDED state
* for the first time.
*/
void setIsGroupStateAddedFirstTime(boolean isGroupAddedFirstTime);
/**
* Returns the isGroupStateAddedFirstTime value. For internal use only.
*
* @return isGroupStateAddedFirstTime value
*/
boolean isGroupStateAddedFirstTime();
/**
* Sets how long this entry has been entered in the system.
*
* @param life epoch time
......
......@@ -15,7 +15,13 @@
*/
package org.onosproject.net.group.impl;
import com.google.common.collect.Sets;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -48,12 +54,7 @@ import org.onosproject.net.provider.AbstractProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.Sets;
/**
* Provides implementation of the group service APIs.
......@@ -103,6 +104,7 @@ public class GroupManager
*/
@Override
public void addGroup(GroupDescription groupDesc) {
log.trace("In addGroup API");
store.storeGroupDescription(groupDesc);
}
......@@ -121,6 +123,7 @@ public class GroupManager
*/
@Override
public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
log.trace("In getGroup API");
return store.getGroup(deviceId, appCookie);
}
......@@ -142,6 +145,7 @@ public class GroupManager
GroupBuckets buckets,
GroupKey newCookie,
ApplicationId appId) {
log.trace("In addBucketsToGroup API");
store.updateGroupDescription(deviceId,
oldCookie,
UpdateType.ADD,
......@@ -167,6 +171,7 @@ public class GroupManager
GroupBuckets buckets,
GroupKey newCookie,
ApplicationId appId) {
log.trace("In removeBucketsFromGroup API");
store.updateGroupDescription(deviceId,
oldCookie,
UpdateType.REMOVE,
......@@ -188,6 +193,7 @@ public class GroupManager
public void removeGroup(DeviceId deviceId,
GroupKey appCookie,
ApplicationId appId) {
log.trace("In removeGroup API");
store.deleteGroupDescription(deviceId, appCookie);
}
......@@ -202,11 +208,13 @@ public class GroupManager
@Override
public Iterable<Group> getGroups(DeviceId deviceId,
ApplicationId appId) {
log.trace("In getGroups API");
return store.getGroups(deviceId);
}
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
log.trace("In getGroups API");
return store.getGroups(deviceId);
}
......@@ -217,6 +225,7 @@ public class GroupManager
*/
@Override
public void addListener(GroupListener listener) {
log.trace("In addListener API");
listenerRegistry.addListener(listener);
}
......@@ -227,6 +236,7 @@ public class GroupManager
*/
@Override
public void removeListener(GroupListener listener) {
log.trace("In removeListener API");
listenerRegistry.removeListener(listener);
}
......@@ -364,36 +374,52 @@ public class GroupManager
Set<Group> extraneousStoredEntries =
Sets.newHashSet(store.getExtraneousGroups(deviceId));
log.trace("Displaying all southboundGroupEntries for device {}", deviceId);
log.trace("Displaying all ({}) southboundGroupEntries for device {}",
southboundGroupEntries.size(),
deviceId);
for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
log.trace("Group {} in device {}", group, deviceId);
}
log.trace("Displaying all stored group entries for device {}", deviceId);
for (Iterator<Group> it = storedGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
log.trace("Displaying all ({}) stored group entries for device {}",
storedGroupEntries.size(),
deviceId);
for (Iterator<Group> it1 = storedGroupEntries.iterator(); it1.hasNext();) {
Group group = it1.next();
log.trace("Stored Group {} for device {}", group, deviceId);
}
for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists "
+ "in both planes for device {}",
group.id(), deviceId);
groupAdded(group);
it.remove();
it2.remove();
}
}
for (Group group : southboundGroupEntries) {
// there are groups in the switch that aren't in the store
log.trace("Group AUDIT: extraneous group {} exists "
+ "in data plane for device {}",
group.id(), deviceId);
extraneousStoredEntries.remove(group);
extraneousGroup(group);
if (store.getGroup(group.deviceId(), group.id()) != null) {
// There is a group existing with the same id
// It is possible that group update is
// in progress while we got a stale info from switch
if (!storedGroupEntries.remove(store.getGroup(
group.deviceId(), group.id()))) {
log.warn("Group AUDIT: Inconsistent state:"
+ "Group exists in ID based table while "
+ "not present in key based table");
}
} else {
// there are groups in the switch that aren't in the store
log.trace("Group AUDIT: extraneous group {} exists "
+ "in data plane for device {}",
group.id(), deviceId);
extraneousStoredEntries.remove(group);
extraneousGroup(group);
}
}
for (Group group : storedGroupEntries) {
// there are groups in the store that aren't in the switch
......
......@@ -15,6 +15,12 @@
*/
package org.onosproject.net.group.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......@@ -38,6 +44,7 @@ import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
......@@ -58,8 +65,6 @@ import org.onosproject.store.trivial.impl.SimpleGroupStore;
import com.google.common.collect.Iterables;
import static org.junit.Assert.*;
/**
* Test codifying the group service & group provider service contracts.
*/
......@@ -108,31 +113,6 @@ public class GroupManagerTest {
mgr.eventDispatcher = null;
}
private class TestGroupKey implements GroupKey {
private String groupId;
public TestGroupKey(String id) {
this.groupId = id;
}
public String id() {
return this.groupId;
}
@Override
public int hashCode() {
return groupId.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TestGroupKey) {
return this.groupId.equals(((TestGroupKey) obj).id());
}
return false;
}
}
/**
* Tests group service north bound and south bound interfaces.
* The following operations are tested:
......@@ -177,7 +157,7 @@ public class GroupManagerTest {
PortNumber.portNumber(32)};
PortNumber[] ports2 = {PortNumber.portNumber(41),
PortNumber.portNumber(42)};
TestGroupKey key = new TestGroupKey("group1BeforeAudit");
GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
List<PortNumber> outPorts = new ArrayList<PortNumber>();
outPorts.addAll(Arrays.asList(ports1));
......@@ -224,7 +204,7 @@ public class GroupManagerTest {
providerService.pushGroupMetrics(DID, groupEntries);
// First group metrics would trigger the device audit completion
// post which all pending group requests are also executed.
TestGroupKey key = new TestGroupKey("group1BeforeAudit");
GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, key);
int createdGroupId = createdGroup.id().id();
assertNotEquals(gId1.id(), createdGroupId);
......@@ -256,7 +236,7 @@ public class GroupManagerTest {
0);
List<Group> groupEntries = Arrays.asList(group1, group2);
providerService.pushGroupMetrics(DID, groupEntries);
TestGroupKey key = new TestGroupKey("group1BeforeAudit");
GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, key);
List<GroupOperation> expectedGroupOps = Arrays.asList(
GroupOperation.createDeleteGroupOperation(gId1,
......@@ -271,7 +251,7 @@ public class GroupManagerTest {
// Test AUDIT with confirmed groups
private void testAuditWithConfirmedGroups() {
TestGroupKey key = new TestGroupKey("group1BeforeAudit");
GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, key);
createdGroup = new DefaultGroup(createdGroup.id(),
DID,
......@@ -284,9 +264,9 @@ public class GroupManagerTest {
// Test group add bucket operations
private void testAddBuckets() {
TestGroupKey addKey = new TestGroupKey("group1AddBuckets");
GroupKey addKey = new DefaultGroupKey("group1AddBuckets".getBytes());
TestGroupKey prevKey = new TestGroupKey("group1BeforeAudit");
GroupKey prevKey = new DefaultGroupKey("group1BeforeAudit".getBytes());
Group createdGroup = groupService.getGroup(DID, prevKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(createdGroup.buckets().buckets());
......@@ -328,9 +308,9 @@ public class GroupManagerTest {
// Test group remove bucket operations
private void testRemoveBuckets() {
TestGroupKey removeKey = new TestGroupKey("group1RemoveBuckets");
GroupKey removeKey = new DefaultGroupKey("group1RemoveBuckets".getBytes());
TestGroupKey prevKey = new TestGroupKey("group1AddBuckets");
GroupKey prevKey = new DefaultGroupKey("group1AddBuckets".getBytes());
Group createdGroup = groupService.getGroup(DID, prevKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(createdGroup.buckets().buckets());
......@@ -372,7 +352,7 @@ public class GroupManagerTest {
// Test group remove operations
private void testRemoveGroup() {
TestGroupKey currKey = new TestGroupKey("group1RemoveBuckets");
GroupKey currKey = new DefaultGroupKey("group1RemoveBuckets".getBytes());
Group existingGroup = groupService.getGroup(DID, currKey);
groupService.removeGroup(DID, currKey, appId);
List<GroupOperation> expectedGroupOps = Arrays.asList(
......@@ -397,7 +377,7 @@ public class GroupManagerTest {
PortNumber[] ports2 = {PortNumber.portNumber(41),
PortNumber.portNumber(42)};
// Test Group creation before AUDIT process
TestGroupKey key = new TestGroupKey("group1BeforeAudit");
GroupKey key = new DefaultGroupKey("group1BeforeAudit".getBytes());
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
List<PortNumber> outPorts = new ArrayList<PortNumber>();
outPorts.addAll(Arrays.asList(ports1));
......
......@@ -16,25 +16,48 @@
package org.onosproject.store.group.impl;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L0ModificationInstruction;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flow.instructions.L3ModificationInstruction;
import org.onosproject.net.group.DefaultGroup;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.Group.GroupState;
import org.onosproject.net.group.GroupBucket;
......@@ -48,10 +71,21 @@ import org.onosproject.net.group.GroupStore;
import org.onosproject.net.group.GroupStoreDelegate;
import org.onosproject.net.group.StoredGroupEntry;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
/**
* Manages inventory of group entries using trivial in-memory implementation.
......@@ -67,85 +101,165 @@ public class DistributedGroupStore
private final int dummyId = 0xffffffff;
private final GroupId dummyGroupId = new DefaultGroupId(dummyId);
// inner Map is per device group table
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
groupEntriesByKey = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, StoredGroupEntry>>
groupEntriesById = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupKey, StoredGroupEntry>>
pendingGroupEntriesByKey = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
// Per device group table with (device id + app cookie) as key
private EventuallyConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> groupStoreEntriesByKey = null;
// Per device group table with (device id + group id) as key
private EventuallyConsistentMap<GroupStoreIdMapKey,
StoredGroupEntry> groupStoreEntriesById = null;
private EventuallyConsistentMap<GroupStoreKeyMapKey,
StoredGroupEntry> auditPendingReqQueue = null;
private final ConcurrentMap<DeviceId, ConcurrentMap<GroupId, Group>>
extraneousGroupEntriesById = new ConcurrentHashMap<>();
private ExecutorService messageHandlingExecutor;
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 1;
private final HashMap<DeviceId, Boolean> deviceAuditStatus =
new HashMap<DeviceId, Boolean>();
private final AtomicInteger groupIdGen = new AtomicInteger();
private KryoNamespace.Builder kryoBuilder = null;
@Activate
public void activate() {
kryoBuilder = new KryoNamespace.Builder()
.register(DefaultGroup.class,
DefaultGroupBucket.class,
DefaultGroupDescription.class,
DefaultGroupKey.class,
GroupDescription.Type.class,
Group.GroupState.class,
GroupBuckets.class,
DefaultGroupId.class,
GroupStoreMessage.class,
GroupStoreMessage.Type.class,
UpdateType.class,
GroupStoreMessageSubjects.class,
MultiValuedTimestamp.class,
GroupStoreKeyMapKey.class,
GroupStoreIdMapKey.class,
GroupStoreMapKey.class
)
.register(URI.class)
.register(DeviceId.class)
.register(PortNumber.class)
.register(DefaultApplicationId.class)
.register(DefaultTrafficTreatment.class,
Instructions.DropInstruction.class,
Instructions.OutputInstruction.class,
Instructions.GroupInstruction.class,
Instructions.TableTypeTransition.class,
FlowRule.Type.class,
L0ModificationInstruction.class,
L0ModificationInstruction.L0SubType.class,
L0ModificationInstruction.ModLambdaInstruction.class,
L2ModificationInstruction.class,
L2ModificationInstruction.L2SubType.class,
L2ModificationInstruction.ModEtherInstruction.class,
L2ModificationInstruction.PushHeaderInstructions.class,
L2ModificationInstruction.ModVlanIdInstruction.class,
L2ModificationInstruction.ModVlanPcpInstruction.class,
L2ModificationInstruction.ModMplsLabelInstruction.class,
L2ModificationInstruction.ModMplsTtlInstruction.class,
L3ModificationInstruction.class,
L3ModificationInstruction.L3SubType.class,
L3ModificationInstruction.ModIPInstruction.class,
L3ModificationInstruction.ModIPv6FlowLabelInstruction.class,
L3ModificationInstruction.ModTtlInstruction.class,
org.onlab.packet.MplsLabel.class
)
.register(org.onosproject.cluster.NodeId.class)
.register(KryoNamespaces.BASIC)
.register(KryoNamespaces.MISC);
messageHandlingExecutor = Executors.
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/group",
"message-handlers"));
clusterCommunicator.
addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
new ClusterGroupMsgHandler(),
messageHandlingExecutor);
log.debug("Creating EC map groupstorekeymap");
groupStoreEntriesByKey =
new EventuallyConsistentMapImpl<>("groupstorekeymap",
clusterService,
clusterCommunicator,
kryoBuilder,
new GroupStoreLogicalClockManager<>());
log.trace("Current size {}", groupStoreEntriesByKey.size());
log.debug("Creating EC map groupstoreidmap");
groupStoreEntriesById =
new EventuallyConsistentMapImpl<>("groupstoreidmap",
clusterService,
clusterCommunicator,
kryoBuilder,
new GroupStoreLogicalClockManager<>());
groupStoreEntriesById.addListener(new GroupStoreIdMapListener());
log.trace("Current size {}", groupStoreEntriesById.size());
log.debug("Creating EC map pendinggroupkeymap");
auditPendingReqQueue =
new EventuallyConsistentMapImpl<>("pendinggroupkeymap",
clusterService,
clusterCommunicator,
kryoBuilder,
new GroupStoreLogicalClockManager<>());
log.trace("Current size {}", auditPendingReqQueue.size());
log.info("Started");
}
@Deactivate
public void deactivate() {
groupEntriesByKey.clear();
groupEntriesById.clear();
log.info("Stopped");
}
private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
lazyEmptyGroupKeyTable() {
return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
}
private static NewConcurrentHashMap<GroupId, StoredGroupEntry>
lazyEmptyGroupIdTable() {
return NewConcurrentHashMap.<GroupId, StoredGroupEntry>ifNeeded();
}
private static NewConcurrentHashMap<GroupKey, StoredGroupEntry>
lazyEmptyPendingGroupKeyTable() {
return NewConcurrentHashMap.<GroupKey, StoredGroupEntry>ifNeeded();
}
private static NewConcurrentHashMap<GroupId, Group>
lazyEmptyExtraneousGroupIdTable() {
lazyEmptyExtraneousGroupIdTable() {
return NewConcurrentHashMap.<GroupId, Group>ifNeeded();
}
/**
* Returns the group key table for specified device.
* Returns the group store eventual consistent key map.
*
* @param deviceId identifier of the device
* @return Map representing group key table of given device.
* @return Map representing group key table.
*/
private ConcurrentMap<GroupKey, StoredGroupEntry> getGroupKeyTable(DeviceId deviceId) {
return createIfAbsentUnchecked(groupEntriesByKey,
deviceId, lazyEmptyGroupKeyTable());
private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
getGroupStoreKeyMap() {
return groupStoreEntriesByKey;
}
/**
* Returns the group id table for specified device.
* Returns the group store eventual consistent id map.
*
* @param deviceId identifier of the device
* @return Map representing group key table of given device.
* @return Map representing group id table.
*/
private ConcurrentMap<GroupId, StoredGroupEntry> getGroupIdTable(DeviceId deviceId) {
return createIfAbsentUnchecked(groupEntriesById,
deviceId, lazyEmptyGroupIdTable());
private EventuallyConsistentMap<GroupStoreIdMapKey, StoredGroupEntry>
getGroupStoreIdMap() {
return groupStoreEntriesById;
}
/**
* Returns the pending group key table for specified device.
* Returns the pending group request table.
*
* @param deviceId identifier of the device
* @return Map representing group key table of given device.
* @return Map representing group key table.
*/
private ConcurrentMap<GroupKey, StoredGroupEntry>
getPendingGroupKeyTable(DeviceId deviceId) {
return createIfAbsentUnchecked(pendingGroupEntriesByKey,
deviceId, lazyEmptyPendingGroupKeyTable());
private EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry>
getPendingGroupKeyTable() {
return auditPendingReqQueue;
}
/**
......@@ -168,8 +282,8 @@ public class DistributedGroupStore
*/
@Override
public int getGroupCount(DeviceId deviceId) {
return (groupEntriesByKey.get(deviceId) != null) ?
groupEntriesByKey.get(deviceId).size() : 0;
return (getGroups(deviceId) != null) ?
Iterables.size(getGroups(deviceId)) : 0;
}
/**
......@@ -182,16 +296,11 @@ public class DistributedGroupStore
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
return FluentIterable.from(getGroupKeyTable(deviceId).values())
.transform(
new Function<StoredGroupEntry, Group>() {
@Override
public Group apply(
StoredGroupEntry input) {
return input;
}
});
log.trace("getGroups: for device {} total number of groups {}",
deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId))
.transform(input -> input);
}
/**
......@@ -204,19 +313,31 @@ public class DistributedGroupStore
*/
@Override
public Group getGroup(DeviceId deviceId, GroupKey appCookie) {
return (groupEntriesByKey.get(deviceId) != null) ?
groupEntriesByKey.get(deviceId).get(appCookie) :
null;
return getStoredGroupEntry(deviceId, appCookie);
}
private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
GroupKey appCookie) {
return getGroupStoreKeyMap().get(new GroupStoreKeyMapKey(deviceId,
appCookie));
}
@Override
public Group getGroup(DeviceId deviceId, GroupId groupId) {
return getStoredGroupEntry(deviceId, groupId);
}
private StoredGroupEntry getStoredGroupEntry(DeviceId deviceId,
GroupId groupId) {
return getGroupStoreIdMap().get(new GroupStoreIdMapKey(deviceId,
groupId));
}
private int getFreeGroupIdValue(DeviceId deviceId) {
int freeId = groupIdGen.incrementAndGet();
while (true) {
Group existing = (
groupEntriesById.get(deviceId) != null) ?
groupEntriesById.get(deviceId).get(new DefaultGroupId(freeId)) :
null;
Group existing = getGroup(deviceId, new DefaultGroupId(freeId));
if (existing == null) {
existing = (
extraneousGroupEntriesById.get(deviceId) != null) ?
......@@ -240,23 +361,45 @@ public class DistributedGroupStore
*/
@Override
public void storeGroupDescription(GroupDescription groupDesc) {
log.trace("In storeGroupDescription");
// Check if a group is existing with the same key
if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
log.warn("Group already exists with the same key {}",
groupDesc.appCookie());
return;
}
if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
// Device group audit has not completed yet
// Add this group description to pending group key table
// Create a group entry object with Dummy Group ID
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
ConcurrentMap<GroupKey, StoredGroupEntry> pendingKeyTable =
getPendingGroupKeyTable(groupDesc.deviceId());
pendingKeyTable.put(groupDesc.appCookie(), group);
// Check if group to be created by a remote instance
if (mastershipService.getLocalRole(
groupDesc.deviceId()) != MastershipRole.MASTER) {
log.debug("Device {} local role is not MASTER",
groupDesc.deviceId());
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
GroupStoreMessageSubjects.
REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build().serialize(groupOp));
if (!clusterCommunicator.unicast(message,
mastershipService.
getMasterFor(
groupDesc.deviceId()))) {
log.warn("Failed to send request to master: {} to {}",
message,
mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
}
log.debug("Sent Group operation request for device {} "
+ "to remote MASTER {}",
groupDesc.deviceId(),
mastershipService.getMasterFor(groupDesc.deviceId()));
return;
}
log.debug("Store group for device {} is getting handled locally",
groupDesc.deviceId());
storeGroupDescriptionInternal(groupDesc);
}
......@@ -266,17 +409,34 @@ public class DistributedGroupStore
return;
}
if (deviceAuditStatus.get(groupDesc.deviceId()) == null) {
// Device group audit has not completed yet
// Add this group description to pending group key table
// Create a group entry object with Dummy Group ID
log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
+ "pending...Queuing Group ADD request",
groupDesc.deviceId());
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
EventuallyConsistentMap<GroupStoreKeyMapKey, StoredGroupEntry> pendingKeyTable =
getPendingGroupKeyTable();
pendingKeyTable.put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
groupDesc.appCookie()),
group);
return;
}
// Get a new group identifier
GroupId id = new DefaultGroupId(getFreeGroupIdValue(groupDesc.deviceId()));
// Create a group entry object
StoredGroupEntry group = new DefaultGroup(id, groupDesc);
// Insert the newly created group entry into concurrent key and id maps
ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
getGroupKeyTable(groupDesc.deviceId());
keyTable.put(groupDesc.appCookie(), group);
ConcurrentMap<GroupId, StoredGroupEntry> idTable =
getGroupIdTable(groupDesc.deviceId());
idTable.put(id, group);
// Insert the newly created group entry into key and id maps
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(groupDesc.deviceId(),
groupDesc.appCookie()), group);
getGroupStoreIdMap().
put(new GroupStoreIdMapKey(groupDesc.deviceId(),
id), group);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
}
......@@ -297,6 +457,42 @@ public class DistributedGroupStore
UpdateType type,
GroupBuckets newBuckets,
GroupKey newAppCookie) {
// Check if group update to be done by a remote instance
if (mastershipService.
getLocalRole(deviceId) != MastershipRole.MASTER) {
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupUpdateRequestMsg(deviceId,
oldAppCookie,
type,
newBuckets,
newAppCookie);
ClusterMessage message =
new ClusterMessage(clusterService.getLocalNode().id(),
GroupStoreMessageSubjects.
REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build().serialize(groupOp));
if (!clusterCommunicator.unicast(message,
mastershipService.
getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
message,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
return;
}
updateGroupDescriptionInternal(deviceId,
oldAppCookie,
type,
newBuckets,
newAppCookie);
}
private void updateGroupDescriptionInternal(DeviceId deviceId,
GroupKey oldAppCookie,
UpdateType type,
GroupBuckets newBuckets,
GroupKey newAppCookie) {
// Check if a group is existing with the provided key
Group oldGroup = getGroup(deviceId, oldAppCookie);
if (oldGroup == null) {
......@@ -323,14 +519,17 @@ public class DistributedGroupStore
newGroup.setPackets(oldGroup.packets());
newGroup.setBytes(oldGroup.bytes());
// Remove the old entry from maps and add new entry using new key
ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
getGroupKeyTable(oldGroup.deviceId());
ConcurrentMap<GroupId, StoredGroupEntry> idTable =
getGroupIdTable(oldGroup.deviceId());
keyTable.remove(oldGroup.appCookie());
idTable.remove(oldGroup.id());
keyTable.put(newGroup.appCookie(), newGroup);
idTable.put(newGroup.id(), newGroup);
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(oldGroup.deviceId(),
oldGroup.appCookie()));
getGroupStoreIdMap().remove(new GroupStoreIdMapKey(oldGroup.deviceId(),
oldGroup.id()));
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(newGroup.deviceId(),
newGroup.appCookie()), newGroup);
getGroupStoreIdMap().
put(new GroupStoreIdMapKey(newGroup.deviceId(),
newGroup.id()), newGroup);
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
}
}
......@@ -379,10 +578,34 @@ public class DistributedGroupStore
@Override
public void deleteGroupDescription(DeviceId deviceId,
GroupKey appCookie) {
// Check if group to be deleted by a remote instance
if (mastershipService.
getLocalRole(deviceId) != MastershipRole.MASTER) {
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupDeleteRequestMsg(deviceId,
appCookie);
ClusterMessage message =
new ClusterMessage(clusterService.getLocalNode().id(),
GroupStoreMessageSubjects.
REMOTE_GROUP_OP_REQUEST,
kryoBuilder.build().serialize(groupOp));
if (!clusterCommunicator.unicast(message,
mastershipService.
getMasterFor(deviceId))) {
log.warn("Failed to send request to master: {} to {}",
message,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
}
return;
}
deleteGroupDescriptionInternal(deviceId, appCookie);
}
private void deleteGroupDescriptionInternal(DeviceId deviceId,
GroupKey appCookie) {
// Check if a group is existing with the provided key
StoredGroupEntry existing = (groupEntriesByKey.get(deviceId) != null) ?
groupEntriesByKey.get(deviceId).get(appCookie) :
null;
StoredGroupEntry existing = getStoredGroupEntry(deviceId, appCookie);
if (existing == null) {
return;
}
......@@ -401,26 +624,35 @@ public class DistributedGroupStore
@Override
public void addOrUpdateGroupEntry(Group group) {
// check if this new entry is an update to an existing entry
StoredGroupEntry existing = (groupEntriesById.get(
group.deviceId()) != null) ?
groupEntriesById.get(group.deviceId()).get(group.id()) :
null;
StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
group.id());
GroupEvent event = null;
if (existing != null) {
log.trace("addOrUpdateGroupEntry: updating group "
+ "entry {} in device {}",
group.id(),
group.deviceId());
synchronized (existing) {
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
if (existing.state() == GroupState.PENDING_ADD) {
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
} else {
if (existing.state() == GroupState.PENDING_UPDATE) {
existing.setState(GroupState.PENDING_UPDATE);
}
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(false);
event = new GroupEvent(Type.GROUP_UPDATED, existing);
}
//Re-PUT map entries to trigger map update events
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()), existing);
getGroupStoreIdMap().
put(new GroupStoreIdMapKey(existing.deviceId(),
existing.id()), existing);
}
}
......@@ -436,18 +668,18 @@ public class DistributedGroupStore
*/
@Override
public void removeGroupEntry(Group group) {
StoredGroupEntry existing = (groupEntriesById.get(
group.deviceId()) != null) ?
groupEntriesById.get(group.deviceId()).get(group.id()) :
null;
StoredGroupEntry existing = getStoredGroupEntry(group.deviceId(),
group.id());
if (existing != null) {
ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
getGroupKeyTable(existing.deviceId());
ConcurrentMap<GroupId, StoredGroupEntry> idTable =
getGroupIdTable(existing.deviceId());
idTable.remove(existing.id());
keyTable.remove(existing.appCookie());
log.trace("removeGroupEntry: removing group "
+ "entry {} in device {}",
group.id(),
group.deviceId());
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
existing.id()));
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
}
}
......@@ -461,9 +693,17 @@ public class DistributedGroupStore
+ "completed for device {}", deviceId);
deviceAuditStatus.put(deviceId, true);
// Execute all pending group requests
ConcurrentMap<GroupKey, StoredGroupEntry> pendingGroupRequests =
getPendingGroupKeyTable(deviceId);
for (Group group:pendingGroupRequests.values()) {
List<StoredGroupEntry> pendingGroupRequests =
getPendingGroupKeyTable().values()
.stream()
.filter(g-> g.deviceId().equals(deviceId))
.collect(Collectors.toList());
log.trace("deviceInitialAuditCompleted: processing "
+ "pending group add requests for device {} and "
+ "number of pending requests {}",
deviceId,
pendingGroupRequests.size());
for (Group group:pendingGroupRequests) {
GroupDescription tmp = new DefaultGroupDescription(
group.deviceId(),
group.type(),
......@@ -471,8 +711,9 @@ public class DistributedGroupStore
group.appCookie(),
group.appId());
storeGroupDescriptionInternal(tmp);
getPendingGroupKeyTable().
remove(new GroupStoreKeyMapKey(deviceId, group.appCookie()));
}
getPendingGroupKeyTable(deviceId).clear();
} else {
if (deviceAuditStatus.get(deviceId)) {
log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
......@@ -494,10 +735,8 @@ public class DistributedGroupStore
@Override
public void groupOperationFailed(DeviceId deviceId, GroupOperation operation) {
StoredGroupEntry existing = (groupEntriesById.get(
deviceId) != null) ?
groupEntriesById.get(deviceId).get(operation.groupId()) :
null;
StoredGroupEntry existing = getStoredGroupEntry(deviceId,
operation.groupId());
if (existing == null) {
log.warn("No group entry with ID {} found ", operation.groupId());
......@@ -518,27 +757,37 @@ public class DistributedGroupStore
log.warn("Unknown group operation type {}", operation.opType());
}
ConcurrentMap<GroupKey, StoredGroupEntry> keyTable =
getGroupKeyTable(existing.deviceId());
ConcurrentMap<GroupId, StoredGroupEntry> idTable =
getGroupIdTable(existing.deviceId());
idTable.remove(existing.id());
keyTable.remove(existing.appCookie());
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
getGroupStoreIdMap().remove(new GroupStoreIdMapKey(existing.deviceId(),
existing.id()));
}
@Override
public void addOrUpdateExtraneousGroupEntry(Group group) {
log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
+ "group entry {} in device {}",
group.id(),
group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.put(group.id(), group);
// Check the reference counter
if (group.referenceCount() == 0) {
log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
+ "counter is zero and triggering remove",
group.id(),
group.deviceId());
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
}
}
@Override
public void removeExtraneousGroupEntry(Group group) {
log.trace("removeExtraneousGroupEntry: remove extraneous "
+ "group entry {} of device {} from store",
group.id(),
group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
getExtraneousGroupIdTable(group.deviceId());
extraneousIdTable.remove(group.id());
......@@ -551,5 +800,192 @@ public class DistributedGroupStore
getExtraneousGroupIdTable(deviceId).values());
}
/**
* ClockService that generates wallclock based timestamps.
*/
private class GroupStoreLogicalClockManager<T, U>
implements ClockService<T, U> {
private final AtomicLong sequenceNumber = new AtomicLong(0);
@Override
public Timestamp getTimestamp(T t1, U u1) {
return new MultiValuedTimestamp<>(System.currentTimeMillis(),
sequenceNumber.getAndIncrement());
}
}
/**
* Map handler to receive any events when the group map is updated.
*/
private class GroupStoreIdMapListener implements
EventuallyConsistentMapListener<GroupStoreIdMapKey, StoredGroupEntry> {
@Override
public void event(EventuallyConsistentMapEvent<GroupStoreIdMapKey,
StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
log.trace("GroupStoreIdMapListener: received groupid map event {}",
mapEvent.type());
if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
log.trace("GroupIdMapListener: Received PUT event");
if (mapEvent.value().state() == Group.GroupState.ADDED) {
if (mapEvent.value().isGroupStateAddedFirstTime()) {
groupEvent = new GroupEvent(Type.GROUP_ADDED,
mapEvent.value());
log.trace("GroupIdMapListener: Received first time "
+ "GROUP_ADDED state update");
} else {
groupEvent = new GroupEvent(Type.GROUP_UPDATED,
mapEvent.value());
log.trace("GroupIdMapListener: Received following "
+ "GROUP_ADDED state update");
}
}
} else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
log.trace("GroupIdMapListener: Received REMOVE event");
groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
}
if (groupEvent != null) {
notifyDelegate(groupEvent);
}
}
}
/**
* Message handler to receive messages from group subsystems of
* other cluster members.
*/
private final class ClusterGroupMsgHandler
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("ClusterGroupMsgHandler: received remote group message");
if (message.subject() ==
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
GroupStoreMessage groupOp = kryoBuilder.
build().deserialize(message.payload());
log.trace("received remote group operation request");
if (!(mastershipService.
getLocalRole(groupOp.deviceId()) !=
MastershipRole.MASTER)) {
log.warn("ClusterGroupMsgHandler: This node is not "
+ "MASTER for device {}", groupOp.deviceId());
return;
}
if (groupOp.type() == GroupStoreMessage.Type.ADD) {
log.trace("processing remote group "
+ "add operation request");
storeGroupDescriptionInternal(groupOp.groupDesc());
} else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
log.trace("processing remote group "
+ "update operation request");
updateGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie(),
groupOp.updateType(),
groupOp.updateBuckets(),
groupOp.newAppCookie());
} else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
log.trace("processing remote group "
+ "delete operation request");
deleteGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie());
}
}
}
}
/**
* Flattened map key to be used to store group entries.
*/
private class GroupStoreMapKey {
private final DeviceId deviceId;
public GroupStoreMapKey(DeviceId deviceId) {
this.deviceId = deviceId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GroupStoreMapKey)) {
return false;
}
GroupStoreMapKey that = (GroupStoreMapKey) o;
return this.deviceId.equals(that.deviceId);
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + Objects.hash(this.deviceId);
return result;
}
}
private class GroupStoreKeyMapKey extends GroupStoreMapKey {
private final GroupKey appCookie;
public GroupStoreKeyMapKey(DeviceId deviceId,
GroupKey appCookie) {
super(deviceId);
this.appCookie = appCookie;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GroupStoreKeyMapKey)) {
return false;
}
GroupStoreKeyMapKey that = (GroupStoreKeyMapKey) o;
return (super.equals(that) &&
this.appCookie.equals(that.appCookie));
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + super.hashCode() + Objects.hash(this.appCookie);
return result;
}
}
private class GroupStoreIdMapKey extends GroupStoreMapKey {
private final GroupId groupId;
public GroupStoreIdMapKey(DeviceId deviceId,
GroupId groupId) {
super(deviceId);
this.groupId = groupId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof GroupStoreIdMapKey)) {
return false;
}
GroupStoreIdMapKey that = (GroupStoreIdMapKey) o;
return (super.equals(that) &&
this.groupId.equals(that.groupId));
}
@Override
public int hashCode() {
int result = 17;
result = 31 * result + super.hashCode() + Objects.hash(this.groupId);
return result;
}
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.group.impl;
import org.onosproject.net.DeviceId;
import org.onosproject.net.group.GroupBuckets;
import org.onosproject.net.group.GroupDescription;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.group.GroupStore.UpdateType;
/**
* Format of the Group store message that is used to
* communicate with the peer nodes in the cluster.
*/
public final class GroupStoreMessage {
private final DeviceId deviceId;
private final GroupKey appCookie;
private final GroupDescription groupDesc;
private final UpdateType updateType;
private final GroupBuckets updateBuckets;
private final GroupKey newAppCookie;
private final Type type;
/**
* Type of group store request.
*/
public enum Type {
ADD,
UPDATE,
DELETE
}
private GroupStoreMessage(Type type,
DeviceId deviceId,
GroupKey appCookie,
GroupDescription groupDesc,
UpdateType updateType,
GroupBuckets updateBuckets,
GroupKey newAppCookie) {
this.type = type;
this.deviceId = deviceId;
this.appCookie = appCookie;
this.groupDesc = groupDesc;
this.updateType = updateType;
this.updateBuckets = updateBuckets;
this.newAppCookie = newAppCookie;
}
/**
* Creates a group store message for group ADD request.
*
* @param deviceId device identifier in which group to be added
* @param desc group creation parameters
* @return constructed group store message
*/
public static GroupStoreMessage createGroupAddRequestMsg(DeviceId deviceId,
GroupDescription desc) {
return new GroupStoreMessage(Type.ADD,
deviceId,
null,
desc,
null,
null,
null);
}
/**
* Creates a group store message for group UPDATE request.
*
* @param deviceId the device ID
* @param appCookie the current group key
* @param updateType update (add or delete) type
* @param updateBuckets group buckets for updates
* @param newAppCookie optional new group key
* @return constructed group store message
*/
public static GroupStoreMessage createGroupUpdateRequestMsg(DeviceId deviceId,
GroupKey appCookie,
UpdateType updateType,
GroupBuckets updateBuckets,
GroupKey newAppCookie) {
return new GroupStoreMessage(Type.UPDATE,
deviceId,
appCookie,
null,
updateType,
updateBuckets,
newAppCookie);
}
/**
* Creates a group store message for group DELETE request.
*
* @param deviceId the device ID
* @param appCookie the group key
* @return constructed group store message
*/
public static GroupStoreMessage createGroupDeleteRequestMsg(DeviceId deviceId,
GroupKey appCookie) {
return new GroupStoreMessage(Type.DELETE,
deviceId,
appCookie,
null,
null,
null,
null);
}
/**
* Returns the device identifier of this group request.
*
* @return device identifier
*/
public DeviceId deviceId() {
return deviceId;
}
/**
* Returns the application cookie associated with this group request.
*
* @return application cookie
*/
public GroupKey appCookie() {
return appCookie;
}
/**
* Returns the group create parameters associated with this group request.
*
* @return group create parameters
*/
public GroupDescription groupDesc() {
return groupDesc;
}
/**
* Returns the group buckets to be updated as part of this group request.
*
* @return group buckets to be updated
*/
public GroupBuckets updateBuckets() {
return updateBuckets;
}
/**
* Returns the update group operation type.
*
* @return update operation type
*/
public UpdateType updateType() {
return updateType;
}
/**
* Returns the new application cookie associated with this group operation.
*
* @return new application cookie
*/
public GroupKey newAppCookie() {
return newAppCookie;
}
/**
* Returns the type of this group operation.
*
* @return group message type
*/
public Type type() {
return type;
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.group.impl;
import org.onosproject.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by DistributedGroupRuleStore peer-peer communication.
*/
public final class GroupStoreMessageSubjects {
private GroupStoreMessageSubjects() {}
public static final MessageSubject REMOTE_GROUP_OP_REQUEST
= new MessageSubject("peer-forward-group-op-req");
}
......@@ -209,6 +209,13 @@ public class SimpleGroupStore
null;
}
@Override
public Group getGroup(DeviceId deviceId, GroupId groupId) {
return (groupEntriesById.get(deviceId) != null) ?
groupEntriesById.get(deviceId).get(groupId) :
null;
}
private int getFreeGroupIdValue(DeviceId deviceId) {
int freeId = groupIdGen.incrementAndGet();
......@@ -551,5 +558,4 @@ public class SimpleGroupStore
getExtraneousGroupIdTable(deviceId).values());
}
}
......
......@@ -36,6 +36,7 @@ import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupBucket;
import org.onosproject.net.group.GroupBuckets;
......@@ -70,31 +71,6 @@ public class SimpleGroupStoreTest {
simpleGroupStore.deactivate();
}
public class TestGroupKey implements GroupKey {
private String groupId;
public TestGroupKey(String id) {
this.groupId = id;
}
public String id() {
return this.groupId;
}
@Override
public int hashCode() {
return groupId.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TestGroupKey) {
return this.groupId.equals(((TestGroupKey) obj).id());
}
return false;
}
}
private class InternalGroupStoreDelegate
implements GroupStoreDelegate {
private GroupId createdGroupId = null;
......@@ -173,20 +149,20 @@ public class SimpleGroupStoreTest {
simpleGroupStore.deviceInitialAuditCompleted(D1, true);
// Testing storeGroup operation
TestGroupKey newKey = new TestGroupKey("group1");
GroupKey newKey = new DefaultGroupKey("group1".getBytes());
testStoreAndGetGroup(newKey);
// Testing addOrUpdateGroupEntry operation from southbound
TestGroupKey currKey = newKey;
GroupKey currKey = newKey;
testAddGroupEntryFromSB(currKey);
// Testing updateGroupDescription for ADD operation from northbound
newKey = new TestGroupKey("group1AddBuckets");
newKey = new DefaultGroupKey("group1AddBuckets".getBytes());
testAddBuckets(currKey, newKey);
// Testing updateGroupDescription for REMOVE operation from northbound
currKey = newKey;
newKey = new TestGroupKey("group1RemoveBuckets");
newKey = new DefaultGroupKey("group1RemoveBuckets".getBytes());
testRemoveBuckets(currKey, newKey);
// Testing addOrUpdateGroupEntry operation from southbound
......@@ -201,7 +177,7 @@ public class SimpleGroupStoreTest {
}
// Testing storeGroup operation
private void testStoreAndGetGroup(TestGroupKey key) {
private void testStoreAndGetGroup(GroupKey key) {
PortNumber[] ports = {PortNumber.portNumber(31),
PortNumber.portNumber(32)};
List<PortNumber> outPorts = new ArrayList<PortNumber>();
......@@ -252,7 +228,7 @@ public class SimpleGroupStoreTest {
}
// Testing addOrUpdateGroupEntry operation from southbound
private void testAddGroupEntryFromSB(TestGroupKey currKey) {
private void testAddGroupEntryFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate addGroupEntryDelegate =
......@@ -265,7 +241,7 @@ public class SimpleGroupStoreTest {
}
// Testing addOrUpdateGroupEntry operation from southbound
private void testUpdateGroupEntryFromSB(TestGroupKey currKey) {
private void testUpdateGroupEntryFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate updateGroupEntryDelegate =
......@@ -278,7 +254,7 @@ public class SimpleGroupStoreTest {
}
// Testing updateGroupDescription for ADD operation from northbound
private void testAddBuckets(TestGroupKey currKey, TestGroupKey addKey) {
private void testAddBuckets(GroupKey currKey, GroupKey addKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(existingGroup.buckets().buckets());
......@@ -316,7 +292,7 @@ public class SimpleGroupStoreTest {
}
// Testing updateGroupDescription for REMOVE operation from northbound
private void testRemoveBuckets(TestGroupKey currKey, TestGroupKey removeKey) {
private void testRemoveBuckets(GroupKey currKey, GroupKey removeKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
List<GroupBucket> buckets = new ArrayList<GroupBucket>();
buckets.addAll(existingGroup.buckets().buckets());
......@@ -343,7 +319,7 @@ public class SimpleGroupStoreTest {
}
// Testing deleteGroupDescription operation from northbound
private void testDeleteGroup(TestGroupKey currKey) {
private void testDeleteGroup(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate deleteGroupDescDelegate =
new InternalGroupStoreDelegate(currKey,
......@@ -355,7 +331,7 @@ public class SimpleGroupStoreTest {
}
// Testing removeGroupEntry operation from southbound
private void testRemoveGroupFromSB(TestGroupKey currKey) {
private void testRemoveGroupFromSB(GroupKey currKey) {
Group existingGroup = simpleGroupStore.getGroup(D1, currKey);
InternalGroupStoreDelegate removeGroupEntryDelegate =
new InternalGroupStoreDelegate(currKey,
......@@ -380,7 +356,7 @@ public class SimpleGroupStoreTest {
ApplicationId appId =
new DefaultApplicationId(2, "org.groupstore.test");
TestGroupKey key = new TestGroupKey("group1");
GroupKey key = new DefaultGroupKey("group1".getBytes());
PortNumber[] ports = {PortNumber.portNumber(31),
PortNumber.portNumber(32)};
List<PortNumber> outPorts = new ArrayList<PortNumber>();
......