Hyunsun Moon
Committed by Gerrit Code Review

CORD-537 Added flow rules for vSG connectivity

- Added Q_IN_Q table
- Added flow rules for vSG connectivity
- Changed to listen port update event from Neutron to update vSG IPs

Change-Id: I227ba7a91e90ec0752481ebf623b4e848d585265
......@@ -38,7 +38,6 @@ import org.onosproject.net.Host;
import org.onosproject.net.HostId;
import org.onosproject.net.HostLocation;
import org.onosproject.net.Port;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
......@@ -137,10 +136,14 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
};
private static final String DEFAULT_TUNNEL = "vxlan";
private static final Ip4Address DEFAULT_DNS = Ip4Address.valueOf("8.8.8.8");
private static final String SERVICE_ID = "serviceId";
private static final String LOCATION_IP = "locationIp";
private static final String OPENSTACK_VM_ID = "openstackVmId";
private static final String OPENSTACK_PORT_ID = "openstackPortId";
private static final String DATA_PLANE_IP = "dataPlaneIp";
private static final String DATA_PLANE_INTF = "dataPlaneIntf";
private static final String S_TAG = "stag";
private static final Ip4Address DEFAULT_DNS = Ip4Address.valueOf("8.8.8.8");
private final ExecutorService eventExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/cordvtn", "event-handler"));
......@@ -263,18 +266,24 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
}
Set<IpAddress> ip = Sets.newHashSet(vPort.fixedIps().values());
SparseAnnotations annotations = DefaultAnnotations.builder()
.set(OPENSTACK_VM_ID, vPort.deviceId())
DefaultAnnotations.Builder annotations = DefaultAnnotations.builder()
.set(SERVICE_ID, vPort.networkId())
.set(LOCATION_IP, node.dpIp().ip().toString())
.build();
.set(OPENSTACK_VM_ID, vPort.deviceId())
.set(OPENSTACK_PORT_ID, vPort.id())
.set(DATA_PLANE_IP, node.dpIp().ip().toString())
.set(DATA_PLANE_INTF, node.dpIntf());
String serviceVlan = getServiceVlan(vPort);
if (serviceVlan != null) {
annotations.set(S_TAG, serviceVlan);
}
HostDescription hostDesc = new DefaultHostDescription(
mac,
VlanId.NONE,
new HostLocation(connectPoint, System.currentTimeMillis()),
ip,
annotations);
annotations.build());
hostProvider.hostDetected(hostId, hostDesc, false);
}
......@@ -294,6 +303,20 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
hostProvider.hostVanished(host.id());
}
@Override
public void updateVirtualSubscriberGateways(HostId vSgHostId, String serviceVlan,
Set<IpAddress> vSgIps) {
Host vSgVm = hostService.getHost(vSgHostId);
if (vSgVm == null || !vSgVm.annotations().value(S_TAG).equals(serviceVlan)) {
log.debug("Invalid vSG updates for {}", serviceVlan);
return;
}
log.info("Updates vSGs in {} with {}", vSgVm.id(), vSgIps.toString());
ruleInstaller.populateSubscriberGatewayRules(vSgVm, vSgIps);
}
/**
* Returns CordService by service ID.
*
......@@ -357,10 +380,11 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
* Returns IP address for tunneling for a given host.
*
* @param host host
* @return ip address
* @return ip address, or null
*/
private IpAddress getTunnelIp(Host host) {
return IpAddress.valueOf(host.annotations().value(LOCATION_IP));
String ip = host.annotations().value(DATA_PLANE_IP);
return ip == null ? null : IpAddress.valueOf(ip);
}
/**
......@@ -374,6 +398,22 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
}
/**
* Returns s-tag from a given OpenStack port.
*
* @param vPort openstack port
* @return s-tag string
*/
private String getServiceVlan(OpenstackPort vPort) {
checkNotNull(vPort);
if (vPort.name() != null && vPort.name().startsWith(S_TAG)) {
return vPort.name().split("-")[1];
} else {
return null;
}
}
/**
* Returns hosts associated with a given OpenStack network.
*
* @param vNet openstack network
......@@ -395,6 +435,30 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
}
/**
* Returns public ip addresses of vSGs running inside a give vSG host.
*
* @param vSgHost vSG host
* @return set of ip address, or empty set
*/
private Set<IpAddress> getSubscriberGatewayIps(Host vSgHost) {
String vPortId = vSgHost.annotations().value(OPENSTACK_PORT_ID);
String serviceVlan = vSgHost.annotations().value(S_TAG);
OpenstackPort vPort = openstackService.port(vPortId);
if (vPort == null) {
log.warn("Failed to get OpenStack port {} for VM {}", vPortId, vSgHost.id());
return Sets.newHashSet();
}
if (!serviceVlan.equals(getServiceVlan(vPort))) {
log.error("Host({}) s-tag does not match with vPort s-tag", vSgHost.id());
return Sets.newHashSet();
}
return vPort.allowedAddressPairs().keySet();
}
/**
* Registers static DHCP lease for a given host.
*
* @param host host
......@@ -452,8 +516,13 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
arpProxy.sendGratuitousArp(service.serviceIp(), gatewayMac, Sets.newHashSet(host));
}
ruleInstaller.populateBasicConnectionRules(host, getTunnelIp(host), vNet);
registerDhcpLease(host, service);
ruleInstaller.populateBasicConnectionRules(host, getTunnelIp(host), vNet);
if (host.annotations().value(S_TAG) != null) {
log.debug("vSG VM detected {}", host.id());
ruleInstaller.populateSubscriberGatewayRules(host, getSubscriberGatewayIps(host));
}
}
/**
......@@ -468,7 +537,7 @@ public class CordVtn extends AbstractProvider implements CordVtnService, HostPro
}
String vNetId = host.annotations().value(SERVICE_ID);
OpenstackNetwork vNet = openstackService.network(host.annotations().value(SERVICE_ID));
OpenstackNetwork vNet = openstackService.network(vNetId);
if (vNet == null) {
log.warn("Failed to get OpenStack network {} for VM {}({}).",
vNetId,
......
......@@ -25,6 +25,7 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.TpPort;
import org.onlab.packet.VlanId;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultGroupId;
......@@ -59,6 +60,7 @@ import org.onosproject.net.flow.instructions.ExtensionPropertyException;
import org.onosproject.net.flow.instructions.ExtensionTreatment;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction;
import org.onosproject.net.group.DefaultGroupBucket;
import org.onosproject.net.group.DefaultGroupDescription;
......@@ -87,6 +89,7 @@ import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
import static org.onosproject.net.flow.instructions.ExtensionTreatmentType.ExtensionTreatmentTypes.NICIRA_SET_TUNNEL_DST;
import static org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType.ETH_DST;
import static org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType.VLAN_PUSH;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -96,21 +99,27 @@ public class CordVtnRuleInstaller {
protected final Logger log = getLogger(getClass());
private static final String PORT_NAME = "portName";
private static final int TABLE_FIRST = 0;
private static final int TABLE_IN_PORT = 1;
private static final int TABLE_ACCESS_TYPE = 2;
private static final int TABLE_IN_SERVICE = 3;
private static final int TABLE_DST_IP = 4;
private static final int TABLE_TUNNEL_IN = 5;
private static final int TABLE_Q_IN_Q = 6;
private static final int MANAGEMENT_PRIORITY = 55000;
private static final int VSG_PRIORITY = 55000;
private static final int HIGH_PRIORITY = 50000;
private static final int DEFAULT_PRIORITY = 5000;
private static final int LOW_PRIORITY = 4000;
private static final int LOWEST_PRIORITY = 0;
private static final int VXLAN_UDP_PORT = 4789;
private static final VlanId VLAN_WAN = VlanId.vlanId((short) 500);
private static final String PORT_NAME = "portName";
private static final String DATA_PLANE_INTF = "dataPlaneIntf";
private static final String S_TAG = "stag";
private final ApplicationId appId;
private final FlowRuleService flowRuleService;
......@@ -163,6 +172,7 @@ public class CordVtnRuleInstaller {
processFirstTable(deviceId, dpPort, dpIp);
processInPortTable(deviceId, tunnelPort, dpPort);
processAccessTypeTable(deviceId, dpPort);
processQInQTable(deviceId, dpPort);
}
/**
......@@ -406,6 +416,10 @@ public class CordVtnRuleInstaller {
DeviceId deviceId = host.location().deviceId();
IpAddress hostIp = host.ipAddresses().stream().findFirst().get();
if (!mastershipService.isLocalMaster(deviceId)) {
return;
}
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_ARP)
.matchArpTpa(mService.serviceIp().getIp4Address())
......@@ -502,6 +516,10 @@ public class CordVtnRuleInstaller {
public void removeManagementNetworkRules(Host host, CordService mService) {
checkNotNull(mService);
if (!mastershipService.isLocalMaster(host.location().deviceId())) {
return;
}
for (FlowRule flowRule : flowRuleService.getFlowRulesById(appId)) {
if (flowRule.deviceId().equals(host.location().deviceId())) {
PortNumber port = getOutputFromTreatment(flowRule);
......@@ -515,6 +533,113 @@ public class CordVtnRuleInstaller {
}
/**
* Populates rules for vSG VM.
*
* @param vSgHost vSG host
* @param vSgIps set of ip addresses of vSGs running inside the vSG VM
*/
public void populateSubscriberGatewayRules(Host vSgHost, Set<IpAddress> vSgIps) {
VlanId serviceVlan = getServiceVlan(vSgHost);
PortNumber dpPort = getDpPort(vSgHost);
if (serviceVlan == null || dpPort == null) {
log.warn("Failed to populate rules for vSG VM {}", vSgHost.id());
return;
}
// for traffics with s-tag, strip the tag and take through the vSG VM
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchVlanId(serviceVlan)
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.popVlan()
.setOutput(vSgHost.location().port())
.build();
FlowRule flowRule = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
.withTreatment(treatment)
.withPriority(DEFAULT_PRIORITY)
.forDevice(vSgHost.location().deviceId())
.forTable(TABLE_Q_IN_Q)
.makePermanent()
.build();
processFlowRule(true, flowRule);
// for traffics with customer vlan, tag with the service vlan based on input port with
// lower priority to avoid conflict with WAN tag
selector = DefaultTrafficSelector.builder()
.matchInPort(vSgHost.location().port())
.build();
treatment = DefaultTrafficTreatment.builder()
.pushVlan()
.setVlanId(serviceVlan)
.setOutput(dpPort)
.build();
flowRule = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
.withTreatment(treatment)
.withPriority(LOW_PRIORITY)
.forDevice(vSgHost.location().deviceId())
.forTable(TABLE_Q_IN_Q)
.makePermanent()
.build();
processFlowRule(true, flowRule);
// for traffic coming from WAN, tag 500 and take through the vSG VM
// based on destination ip
vSgIps.stream().forEach(ip -> {
TrafficSelector downstream = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(ip.toIpPrefix())
.build();
TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
.pushVlan()
.setVlanId(VLAN_WAN)
.setEthDst(vSgHost.mac())
.setOutput(vSgHost.location().port())
.build();
FlowRule downstreamFlowRule = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(downstream)
.withTreatment(downstreamTreatment)
.withPriority(DEFAULT_PRIORITY)
.forDevice(vSgHost.location().deviceId())
.forTable(TABLE_DST_IP)
.makePermanent()
.build();
processFlowRule(true, downstreamFlowRule);
});
// remove downstream flow rules for the vSG not shown in vSgIps
for (FlowRule rule : flowRuleService.getFlowRulesById(appId)) {
if (!rule.deviceId().equals(vSgHost.location().deviceId())) {
continue;
}
PortNumber output = getOutputFromTreatment(rule);
if (output == null || !output.equals(vSgHost.location().port()) ||
!isVlanPushFromTreatment(rule)) {
continue;
}
IpPrefix dstIp = getDstIpFromSelector(rule);
if (dstIp != null && !vSgIps.contains(dstIp.address())) {
processFlowRule(false, rule);
}
}
}
/**
* Populates default rules on the first table.
* It includes the rules for shuttling vxlan-encapped packets between ovs and
* linux stack,and external network connectivity.
......@@ -596,6 +721,7 @@ public class CordVtnRuleInstaller {
selector = DefaultTrafficSelector.builder()
.matchInPort(dpPort)
.matchEthType(Ethernet.TYPE_ARP)
.matchArpTpa(dpIp.getIp4Address())
.build();
treatment = DefaultTrafficTreatment.builder()
......@@ -633,6 +759,27 @@ public class CordVtnRuleInstaller {
.build();
processFlowRule(true, flowRule);
// take all vlan tagged packet to the Q_IN_Q table
selector = DefaultTrafficSelector.builder()
.matchVlanId(VlanId.ANY)
.build();
treatment = DefaultTrafficTreatment.builder()
.transition(TABLE_Q_IN_Q)
.build();
flowRule = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
.withTreatment(treatment)
.withPriority(VSG_PRIORITY)
.forDevice(deviceId)
.forTable(TABLE_FIRST)
.makePermanent()
.build();
processFlowRule(true, flowRule);
}
/**
......@@ -716,6 +863,57 @@ public class CordVtnRuleInstaller {
}
/**
* Populates default rules for Q_IN_Q table.
*
* @param deviceId device id
* @param dpPort data plane interface port number
*/
private void processQInQTable(DeviceId deviceId, PortNumber dpPort) {
// for traffic going out to WAN, strip vid 500 and take through data plane interface
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchVlanId(VLAN_WAN)
.build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.popVlan()
.setOutput(dpPort)
.build();
FlowRule flowRule = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
.withTreatment(treatment)
.withPriority(DEFAULT_PRIORITY)
.forDevice(deviceId)
.forTable(TABLE_Q_IN_Q)
.makePermanent()
.build();
processFlowRule(true, flowRule);
selector = DefaultTrafficSelector.builder()
.matchVlanId(VLAN_WAN)
.matchEthType(Ethernet.TYPE_ARP)
.build();
treatment = DefaultTrafficTreatment.builder()
.setOutput(PortNumber.CONTROLLER)
.build();
flowRule = DefaultFlowRule.builder()
.fromApp(appId)
.withSelector(selector)
.withTreatment(treatment)
.withPriority(HIGH_PRIORITY)
.forDevice(deviceId)
.forTable(TABLE_Q_IN_Q)
.makePermanent()
.build();
processFlowRule(true, flowRule);
}
/**
* Populates rules for local in port in IN_PORT table.
* Flows from a given in port, whose source IP is service IP transition
* to DST_TYPE table. Other flows transition to IN_SERVICE table.
......@@ -1033,8 +1231,8 @@ public class CordVtnRuleInstaller {
*/
private PortNumber getTunnelPort(DeviceId deviceId) {
Port port = deviceService.getPorts(deviceId).stream()
.filter(p -> p.annotations().value(PORT_NAME).contains(tunnelType))
.findFirst().orElse(null);
.filter(p -> p.annotations().value(PORT_NAME).contains(tunnelType))
.findFirst().orElse(null);
return port == null ? null : port.number();
}
......@@ -1048,13 +1246,34 @@ public class CordVtnRuleInstaller {
*/
private PortNumber getDpPort(DeviceId deviceId, String dpIntf) {
Port port = deviceService.getPorts(deviceId).stream()
.filter(p -> p.annotations().value(PORT_NAME).contains(dpIntf) &&
p.isEnabled())
.findFirst().orElse(null);
.filter(p -> p.annotations().value(PORT_NAME).contains(dpIntf) &&
p.isEnabled())
.findFirst().orElse(null);
return port == null ? null : port.number();
}
/** Returns data plane interface port number of a given host.
*
* @param host host
* @return port number, or null
*/
private PortNumber getDpPort(Host host) {
String portName = host.annotations().value(DATA_PLANE_INTF);
return portName == null ? null : getDpPort(host.location().deviceId(), portName);
}
/**
* Returns service vlan from a given host.
*
* @param host host
* @return vlan id, or null
*/
private VlanId getServiceVlan(Host host) {
String serviceVlan = host.annotations().value(S_TAG);
return serviceVlan == null ? null : VlanId.vlanId(Short.parseShort(serviceVlan));
}
/**
* Returns the inport from a given flow rule if the rule contains the match of it.
*
......@@ -1171,7 +1390,7 @@ public class CordVtnRuleInstaller {
*/
private PortNumber getOutputFromTreatment(FlowRule flowRule) {
Instruction instruction = flowRule.treatment().allInstructions().stream()
.filter(inst -> inst instanceof Instructions.OutputInstruction)
.filter(inst -> inst instanceof Instructions.OutputInstruction)
.findFirst()
.orElse(null);
......@@ -1183,6 +1402,22 @@ public class CordVtnRuleInstaller {
}
/**
* Returns if a given flow rule has vlan push instruction or not.
*
* @param flowRule flow rule
* @return true if it includes vlan push, or false
*/
private boolean isVlanPushFromTreatment(FlowRule flowRule) {
Instruction instruction = flowRule.treatment().allInstructions().stream()
.filter(inst -> inst instanceof L2ModificationInstruction)
.filter(inst -> ((L2ModificationInstruction) inst).subtype().equals(VLAN_PUSH))
.findAny()
.orElse(null);
return instruction != null;
}
/**
* Creates a new group for a given service.
*
* @param deviceId device id to create a group
......
......@@ -15,7 +15,11 @@
*/
package org.onosproject.cordvtn;
import org.onlab.packet.IpAddress;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.HostId;
import java.util.Set;
/**
* Service for provisioning overlay virtual networks on compute nodes.
......@@ -57,4 +61,14 @@ public interface CordVtnService {
* @param pServiceId id of the service which provide dependency
*/
void removeServiceDependency(CordServiceId tServiceId, CordServiceId pServiceId);
/**
* Updates virtual service gateways.
*
* @param vSgHost host id of vSG host
* @param serviceVlan service vlan id
* @param vSgIps set of ip address of vSGs running in this vSG host
*/
void updateVirtualSubscriberGateways(HostId vSgHost, String serviceVlan,
Set<IpAddress> vSgIps);
}
......
......@@ -220,7 +220,7 @@ public final class RemoteIpCommandUtil {
return null;
}
log.debug("Execute command {} to {}", command, session.getHost());
log.trace("Execute command {} to {}", command, session.getHost());
try {
Channel channel = session.openChannel("exec");
......
......@@ -15,6 +15,13 @@
*/
package org.onosproject.cordvtn.rest;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onosproject.cordvtn.CordVtnService;
import org.onosproject.net.HostId;
import org.onosproject.rest.AbstractWebResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,16 +36,29 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.InputStream;
import java.util.Set;
/**
* Dummy Neutron ML2 mechanism driver.
* It just returns OK for ports resource requests.
* It just returns OK for ports resource requests except for the port update.
*/
@Path("ports")
public class NeutronMl2PortsWebResource extends AbstractWebResource {
protected final Logger log = LoggerFactory.getLogger(getClass());
private static final String PORTS_MESSAGE = "Received ports %s";
private static final String PORT = "port";
private static final String DEVICE_ID = "device_id";
private static final String NAME = "name";
private static final String MAC_ADDRESS = "mac_address";
private static final String ADDRESS_PAIRS = "allowed_address_pairs";
private static final String IP_ADDERSS = "ip_address";
private static final String STAG_PREFIX = "stag-";
private static final int STAG_BEGIN_INDEX = 5;
private final CordVtnService service = get(CordVtnService.class);
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
......@@ -53,6 +73,35 @@ public class NeutronMl2PortsWebResource extends AbstractWebResource {
@Produces(MediaType.APPLICATION_JSON)
public Response updatePorts(@PathParam("id") String id, InputStream input) {
log.debug(String.format(PORTS_MESSAGE, "update"));
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(input).get(PORT);
log.trace("{}", jsonNode.toString());
String deviceId = jsonNode.path(DEVICE_ID).asText();
String name = jsonNode.path(NAME).asText();
if (deviceId.isEmpty() || name.isEmpty() || !name.startsWith(STAG_PREFIX)) {
// ignore all updates other than allowed address pairs
return Response.status(Response.Status.OK).build();
}
// this is allowed address pairs updates
MacAddress mac = MacAddress.valueOf(jsonNode.path(MAC_ADDRESS).asText());
Set<IpAddress> vSgIps = Sets.newHashSet();
jsonNode.path(ADDRESS_PAIRS).forEach(addrPair -> {
IpAddress ip = IpAddress.valueOf(addrPair.path(IP_ADDERSS).asText());
vSgIps.add(ip);
});
service.updateVirtualSubscriberGateways(
HostId.hostId(mac),
name.substring(STAG_BEGIN_INDEX),
vSgIps);
} catch (Exception e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
return Response.status(Response.Status.OK).build();
}
......