Thomas Vachuska

ONOS-2812 Refactored the link code in search of a defect; the root cause was old…

… OVS-based switch which is wrongly forwarding LLDP frames.

ONOS can fix this later by tracking links to be pruned using port pairs.

Change-Id: Ia79ec69946daff80636f5ab4b75a3dcdba91465d
......@@ -275,6 +275,10 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
}
private LinkDiscovery createLinkDiscovery(Device device) {
return new LinkDiscovery(device, packetService, masterService,
providerService, useBDDP);
}
private class InternalRoleListener implements MastershipListener {
......@@ -297,11 +301,8 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
synchronized (discoverers) {
if (!discoverers.containsKey(deviceId)) {
// ideally, should never reach here
log.debug("Device mastership changed ({}) {}",
event.type(), deviceId);
discoverers.put(deviceId, new LinkDiscovery(device,
packetService, masterService, providerService,
useBDDP));
log.debug("Device mastership changed ({}) {}", event.type(), deviceId);
discoverers.put(deviceId, createLinkDiscovery(device));
}
}
}
......@@ -331,15 +332,11 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
log.debug("LinkDiscovery from {} disabled by configuration", device.id());
return;
}
log.debug("Device added ({}) {}", event.type(),
deviceId);
discoverers.put(deviceId, new LinkDiscovery(device,
packetService, masterService,
providerService, useBDDP));
log.debug("Device added ({}) {}", event.type(), deviceId);
discoverers.put(deviceId, createLinkDiscovery(device));
} else {
if (ld.isStopped()) {
log.debug("Device restarted ({}) {}", event.type(),
deviceId);
log.debug("Device restarted ({}) {}", event.type(), deviceId);
ld.start();
}
}
......@@ -363,15 +360,13 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
} else {
log.debug("Port down {}", port);
ConnectPoint point = new ConnectPoint(deviceId,
port.number());
ConnectPoint point = new ConnectPoint(deviceId, port.number());
providerService.linksVanished(point);
}
break;
case PORT_REMOVED:
log.debug("Port removed {}", port);
ConnectPoint point = new ConnectPoint(deviceId,
port.number());
ConnectPoint point = new ConnectPoint(deviceId, port.number());
providerService.linksVanished(point);
break;
......@@ -411,8 +406,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
if (context == null) {
return;
}
LinkDiscovery ld = discoverers.get(
context.inPacket().receivedFrom().deviceId());
LinkDiscovery ld = discoverers.get(context.inPacket().receivedFrom().deviceId());
if (ld == null) {
return;
}
......@@ -441,8 +435,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
synchronized (discoverers) {
LinkDiscovery discoverer = discoverers.get(did);
if (discoverer == null) {
discoverer = new LinkDiscovery(dev, packetService,
masterService, providerService, useBDDP);
discoverer = createLinkDiscovery(dev);
discoverers.put(did, discoverer);
}
......
......@@ -15,21 +15,8 @@
*/
package org.onosproject.provider.lldp.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.onlab.packet.Ethernet;
......@@ -51,7 +38,20 @@ import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketService;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
// TODO: add 'fast discovery' mode: drop LLDPs in destination switch but listen for flow_removed messages
// FIXME: add ability to track links using port pairs or the link inventory
/**
* Run discovery process from a physical switch. Ports are initially labeled as
......@@ -62,74 +62,79 @@ import org.slf4j.Logger;
*/
public class LinkDiscovery implements TimerTask {
private final Logger log = getLogger(getClass());
private static final short MAX_PROBE_COUNT = 3; // probes to send before link is removed
private static final short DEFAULT_PROBE_RATE = 3000; // millis
private static final String SRC_MAC = "DE:AD:BE:EF:BA:11";
private static final String SERVICE_NULL = "Service cannot be null";
private final Device device;
// send 1 probe every probeRate milliseconds
private final long probeRate;
private final Set<Long> slowPorts;
private final long probeRate = DEFAULT_PROBE_RATE;
private final Set<Long> slowPorts = Sets.newConcurrentHashSet();
// ports, known to have incoming links
private final Set<Long> fastPorts;
private final Set<Long> fastPorts = Sets.newConcurrentHashSet();
// number of unacknowledged probes per port
private final Map<Long, AtomicInteger> portProbeCount;
// number of probes to send before link is removed
private static final short MAX_PROBE_COUNT = 3;
private final Logger log = getLogger(getClass());
private final Map<Long, AtomicInteger> portProbeCount = Maps.newHashMap();
private final ONOSLLDP lldpPacket;
private final Ethernet ethPacket;
private Ethernet bddpEth;
private final boolean useBDDP;
private Timeout timeout;
private volatile boolean isStopped;
private final LinkProviderService linkProvider;
private final PacketService pktService;
private final MastershipService mastershipService;
private Timeout timeout;
private volatile boolean isStopped;
/**
* Instantiates discovery manager for the given physical switch. Creates a
* generic LLDP packet that will be customized for the port it is sent out on.
* Starts the the timer for the discovery process.
*
* @param device the physical switch
* @param pktService packet service
* @param masterService mastership service
* @param device the physical switch
* @param pktService packet service
* @param masterService mastership service
* @param providerService link provider service
* @param useBDDP flag to also use BDDP for discovery
* @param useBDDP flag to also use BDDP for discovery
*/
public LinkDiscovery(Device device, PacketService pktService,
MastershipService masterService,
LinkProviderService providerService, Boolean... useBDDP) {
this.device = device;
this.probeRate = 3000;
this.linkProvider = providerService;
this.pktService = pktService;
this.mastershipService = checkNotNull(masterService, "WTF!");
this.slowPorts = Collections.synchronizedSet(new HashSet<>());
this.fastPorts = Collections.synchronizedSet(new HashSet<>());
this.portProbeCount = new HashMap<>();
this.lldpPacket = new ONOSLLDP();
this.lldpPacket.setChassisId(device.chassisId());
this.lldpPacket.setDevice(device.id().toString());
this.ethPacket = new Ethernet();
this.ethPacket.setEtherType(Ethernet.TYPE_LLDP);
this.ethPacket.setDestinationMACAddress(ONOSLLDP.LLDP_NICIRA);
this.ethPacket.setPayload(this.lldpPacket);
this.ethPacket.setPad(true);
this.linkProvider = checkNotNull(providerService, SERVICE_NULL);
this.pktService = checkNotNull(pktService, SERVICE_NULL);
this.mastershipService = checkNotNull(masterService, SERVICE_NULL);
lldpPacket = new ONOSLLDP();
lldpPacket.setChassisId(device.chassisId());
lldpPacket.setDevice(device.id().toString());
ethPacket = new Ethernet();
ethPacket.setEtherType(Ethernet.TYPE_LLDP);
ethPacket.setDestinationMACAddress(ONOSLLDP.LLDP_NICIRA);
ethPacket.setPayload(this.lldpPacket);
ethPacket.setPad(true);
this.useBDDP = useBDDP.length > 0 ? useBDDP[0] : false;
if (this.useBDDP) {
this.bddpEth = new Ethernet();
this.bddpEth.setPayload(this.lldpPacket);
this.bddpEth.setEtherType(Ethernet.TYPE_BSN);
this.bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
this.bddpEth.setPad(true);
bddpEth = new Ethernet();
bddpEth.setPayload(lldpPacket);
bddpEth.setEtherType(Ethernet.TYPE_BSN);
bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
bddpEth.setPad(true);
log.info("Using BDDP to discover network");
}
this.isStopped = true;
isStopped = true;
start();
this.log.debug("Started discovery manager for switch {}",
device.id());
log.debug("Started discovery manager for switch {}", device.id());
}
......@@ -139,19 +144,18 @@ public class LinkDiscovery implements TimerTask {
*
* @param port the port
*/
public void addPort(final Port port) {
public void addPort(Port port) {
boolean newPort = false;
synchronized (this) {
if (!containsPort(port.number().toLong())) {
newPort = true;
this.slowPorts.add(port.number().toLong());
slowPorts.add(port.number().toLong());
}
}
boolean isMaster = mastershipService.isLocalMaster(device.id());
if (newPort && isMaster) {
this.log.debug("Sending init probe to port {}@{}",
port.number().toLong(), device.id());
log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
sendProbes(port.number().toLong());
}
}
......@@ -161,21 +165,19 @@ public class LinkDiscovery implements TimerTask {
*
* @param port the port
*/
public void removePort(final Port port) {
public void removePort(Port port) {
// Ignore ports that are not on this switch
long portnum = port.number().toLong();
synchronized (this) {
if (this.slowPorts.contains(portnum)) {
this.slowPorts.remove(portnum);
if (slowPorts.contains(portnum)) {
slowPorts.remove(portnum);
} else if (this.fastPorts.contains(portnum)) {
this.fastPorts.remove(portnum);
this.portProbeCount.remove(portnum);
} else if (fastPorts.contains(portnum)) {
fastPorts.remove(portnum);
portProbeCount.remove(portnum);
// no iterator to update
} else {
this.log.warn("Tried to dynamically remove non-existing port {}",
portnum);
log.warn("Tried to dynamically remove non-existing port {}", portnum);
}
}
}
......@@ -187,18 +189,17 @@ public class LinkDiscovery implements TimerTask {
*
* @param portNumber the port
*/
public void ackProbe(final Long portNumber) {
public void ackProbe(Long portNumber) {
synchronized (this) {
if (this.slowPorts.contains(portNumber)) {
this.log.debug("Setting slow port to fast: {}:{}",
this.device.id(), portNumber);
this.slowPorts.remove(portNumber);
this.fastPorts.add(portNumber);
this.portProbeCount.put(portNumber, new AtomicInteger(0));
} else if (this.fastPorts.contains(portNumber)) {
this.portProbeCount.get(portNumber).set(0);
if (slowPorts.contains(portNumber)) {
log.debug("Setting slow port to fast: {}:{}", device.id(), portNumber);
slowPorts.remove(portNumber);
fastPorts.add(portNumber);
portProbeCount.put(portNumber, new AtomicInteger(0));
} else if (fastPorts.contains(portNumber)) {
portProbeCount.get(portNumber).set(0);
} else {
this.log.debug("Got ackProbe for non-existing port: {}", portNumber);
log.debug("Got ackProbe for non-existing port: {}", portNumber);
}
}
}
......@@ -207,6 +208,7 @@ public class LinkDiscovery implements TimerTask {
/**
* Handles an incoming LLDP packet. Creates link in topology and sends ACK
* to port where LLDP originated.
*
* @param context packet context
* @return true if handled
*/
......@@ -218,21 +220,18 @@ public class LinkDiscovery implements TimerTask {
ONOSLLDP onoslldp = ONOSLLDP.parseONOSLLDP(eth);
if (onoslldp != null) {
final PortNumber dstPort =
context.inPacket().receivedFrom().port();
final PortNumber srcPort = portNumber(onoslldp.getPort());
final DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
final DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
this.ackProbe(dstPort.toLong());
PortNumber srcPort = portNumber(onoslldp.getPort());
PortNumber dstPort = context.inPacket().receivedFrom().port();
DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
ackProbe(dstPort.toLong());
ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
LinkDescription ld;
if (eth.getEtherType() == Ethernet.TYPE_BSN) {
ld = new DefaultLinkDescription(src, dst, Type.INDIRECT);
} else {
ld = new DefaultLinkDescription(src, dst, Type.DIRECT);
}
LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
new DefaultLinkDescription(src, dst, Type.DIRECT) :
new DefaultLinkDescription(src, dst, Type.INDIRECT);
try {
linkProvider.linkDetected(ld);
......@@ -253,38 +252,36 @@ public class LinkDiscovery implements TimerTask {
* @param t timeout
*/
@Override
public void run(final Timeout t) {
public void run(Timeout t) {
if (isStopped()) {
return;
}
if (!mastershipService.isLocalMaster(device.id())) {
if (!isStopped()) {
// reschedule timer
timeout = Timer.getTimer().newTimeout(this, this.probeRate, MILLISECONDS);
timeout = Timer.getTimer().newTimeout(this, probeRate, MILLISECONDS);
}
return;
}
this.log.trace("Sending probes from {}", device.id());
log.trace("Sending probes from {}", device.id());
synchronized (this) {
final Iterator<Long> fastIterator = this.fastPorts.iterator();
Iterator<Long> fastIterator = fastPorts.iterator();
while (fastIterator.hasNext()) {
long portNumber = fastIterator.next();
int probeCount = portProbeCount.get(portNumber).getAndIncrement();
if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) {
this.log.trace("Sending fast probe to port {}", portNumber);
log.trace("Sending fast probe to port {}", portNumber);
sendProbes(portNumber);
} else {
// Link down, demote to slowPorts
// Update fast and slow ports
// Link down, demote to slowPorts; update fast and slow ports
fastIterator.remove();
this.slowPorts.add(portNumber);
this.portProbeCount.remove(portNumber);
slowPorts.add(portNumber);
portProbeCount.remove(portNumber);
ConnectPoint cp = new ConnectPoint(device.id(),
portNumber(portNumber));
ConnectPoint cp = new ConnectPoint(device.id(), portNumber(portNumber));
log.debug("Link down -> {}", cp);
linkProvider.linksVanished(cp);
}
......@@ -292,14 +289,14 @@ public class LinkDiscovery implements TimerTask {
// send a probe for the next slow port
for (long portNumber : slowPorts) {
this.log.trace("Sending slow probe to port {}", portNumber);
log.trace("Sending slow probe to port {}", portNumber);
sendProbes(portNumber);
}
}
if (!isStopped()) {
// reschedule timer
timeout = Timer.getTimer().newTimeout(this, this.probeRate, MILLISECONDS);
timeout = Timer.getTimer().newTimeout(this, probeRate, MILLISECONDS);
}
}
......@@ -323,17 +320,15 @@ public class LinkDiscovery implements TimerTask {
* @param port the port
* @return Packet_out message with LLDP data
*/
private OutboundPacket createOutBoundLLDP(final Long port) {
private OutboundPacket createOutBoundLLDP(Long port) {
if (port == null) {
return null;
}
this.lldpPacket.setPortId(port.intValue());
this.ethPacket.setSourceMACAddress("DE:AD:BE:EF:BA:11");
final byte[] lldp = this.ethPacket.serialize();
return new DefaultOutboundPacket(this.device.id(),
lldpPacket.setPortId(port.intValue());
ethPacket.setSourceMACAddress(SRC_MAC);
return new DefaultOutboundPacket(device.id(),
builder().setOutput(portNumber(port)).build(),
ByteBuffer.wrap(lldp));
ByteBuffer.wrap(ethPacket.serialize()));
}
/**
......@@ -342,25 +337,23 @@ public class LinkDiscovery implements TimerTask {
* @param port the port
* @return Packet_out message with LLDP data
*/
private OutboundPacket createOutBoundBDDP(final Long port) {
private OutboundPacket createOutBoundBDDP(Long port) {
if (port == null) {
return null;
}
this.lldpPacket.setPortId(port.intValue());
this.bddpEth.setSourceMACAddress("DE:AD:BE:EF:BA:11");
final byte[] bddp = this.bddpEth.serialize();
return new DefaultOutboundPacket(this.device.id(),
lldpPacket.setPortId(port.intValue());
bddpEth.setSourceMACAddress(SRC_MAC);
return new DefaultOutboundPacket(device.id(),
builder().setOutput(portNumber(port)).build(),
ByteBuffer.wrap(bddp));
ByteBuffer.wrap(bddpEth.serialize()));
}
private void sendProbes(Long portNumber) {
log.trace("Sending probes out to {}@{}", portNumber, device.id());
OutboundPacket pkt = this.createOutBoundLLDP(portNumber);
OutboundPacket pkt = createOutBoundLLDP(portNumber);
pktService.emit(pkt);
if (useBDDP) {
OutboundPacket bpkt = this.createOutBoundBDDP(portNumber);
OutboundPacket bpkt = createOutBoundBDDP(portNumber);
pktService.emit(bpkt);
}
}
......