Thomas Vachuska
Committed by Gerrit Code Review

ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather …

…than being with each link discovery helper.

This will make it behave properly in a distributed context.

Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
......@@ -16,13 +16,14 @@
package org.onosproject.provider.lldp.impl;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.LinkKey;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.packet.PacketService;
/**
* Shared context for use by link discovery.
*/
public interface DiscoveryContext {
interface DiscoveryContext {
/**
* Returns the shared mastership service reference.
......@@ -53,16 +54,16 @@ public interface DiscoveryContext {
long probeRate();
/**
* Returns the max stale link age in millis.
* Indicates whether to emit BDDP.
*
* @return stale link age
* @return true to emit BDDP
*/
long staleLinkAge();
boolean useBDDP();
/**
* Indicates whether to emit BDDP.
* Touches the link identified by the given key to indicate that it's active.
*
* @return true to emit BDDP
* @param key link key
*/
boolean useBDDP();
void touchLink(LinkKey key);
}
......
......@@ -17,6 +17,7 @@ package org.onosproject.provider.lldp.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -34,15 +35,18 @@ import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.LinkKey;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
......@@ -65,6 +69,7 @@ import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.Link.Type.DIRECT;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -91,6 +96,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -103,8 +111,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
private ScheduledExecutorService executor;
private static final long INIT_DELAY = 5;
private static final long DELAY = 5;
// TODO: Add sanity checking for the configurable params based on the delays
private static final long DEVICE_SYNC_DELAY = 5;
private static final long LINK_PRUNER_DELAY = 3;
private static final String PROP_ENABLED = "enabled";
@Property(name = PROP_ENABLED, boolValue = true,
......@@ -135,13 +144,18 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
label = "Path to LLDP suppression configuration file")
private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG;
private final DiscoveryContext context = new InternalDiscoveryContext();
private final InternalLinkProvider listener = new InternalLinkProvider();
private final InternalRoleListener roleListener = new InternalRoleListener();
private final InternalDeviceListener deviceListener = new InternalDeviceListener();
private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
// Device link discovery helpers.
protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
// Most recent time a tracked link was seen; links are tracked if their
// destination connection point is mastered by this controller instance.
private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
private SuppressionRules rules;
private ApplicationId appId;
......@@ -216,28 +230,37 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
}
/**
* Enables link discovery processing.
*/
private void enable() {
providerService = providerRegistry.register(this);
deviceService.addListener(listener);
packetService.addProcessor(listener, PacketProcessor.advisor(0));
masterService.addListener(roleListener);
deviceService.addListener(deviceListener);
packetService.addProcessor(packetProcessor, PacketProcessor.advisor(0));
processDevices();
loadDevices();
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d"));
executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS);
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d"));
executor.scheduleAtFixedRate(new SyncDeviceInfoTask(),
DEVICE_SYNC_DELAY, DEVICE_SYNC_DELAY, SECONDS);
executor.scheduleAtFixedRate(new LinkPrunerTask(),
LINK_PRUNER_DELAY, LINK_PRUNER_DELAY, SECONDS);
loadSuppressionRules();
requestIntercepts();
}
/**
* Disables link discovery processing.
*/
private void disable() {
withdrawIntercepts();
providerRegistry.unregister(this);
deviceService.removeListener(listener);
packetService.removeProcessor(listener);
masterService.removeListener(roleListener);
deviceService.removeListener(deviceListener);
packetService.removeProcessor(packetProcessor);
if (executor != null) {
executor.shutdownNow();
......@@ -248,7 +271,10 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
providerService = null;
}
private void processDevices() {
/**
* Loads available devices and registers their ports to be probed.
*/
private void loadDevices() {
for (Device device : deviceService.getAvailableDevices()) {
if (rules.isSuppressed(device)) {
log.debug("LinkDiscovery from {} disabled by configuration", device.id());
......@@ -260,6 +286,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
}
/**
* Adds ports of the specified device to the specified discovery helper.
*/
private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
for (Port p : deviceService.getPorts(deviceId)) {
if (rules.isSuppressed(p)) {
......@@ -271,7 +300,12 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
}
/**
* Loads LLDP suppression rules.
*/
private void loadSuppressionRules() {
// FIXME: convert to use network configuration
SuppressionRulesStore store = new SuppressionRulesStore(lldpSuppression);
try {
log.info("Reading suppression rules from {}", lldpSuppression);
......@@ -288,7 +322,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
/**
* Request packet intercepts.
* Requests packet intercepts.
*/
private void requestIntercepts() {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
......@@ -304,7 +338,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
/**
* Withdraw packet intercepts.
* Withdraws packet intercepts.
*/
private void withdrawIntercepts() {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
......@@ -314,6 +348,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
}
/**
* Processes device mastership role changes.
*/
private class InternalRoleListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
......@@ -336,7 +373,10 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
private class InternalLinkProvider implements PacketProcessor, DeviceListener {
/**
* Processes device events.
*/
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
LinkDiscovery ld;
......@@ -426,7 +466,12 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
log.debug("Unknown event {}", event);
}
}
}
/**
* Processes incoming packets.
*/
private class InternalPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
if (context == null) {
......@@ -443,6 +488,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
}
/**
* Auxiliary task to keep device ports up to date.
*/
private final class SyncDeviceInfoTask implements Runnable {
@Override
public void run() {
......@@ -464,12 +512,54 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
}
} catch (Exception e) {
// catch all Exception to avoid Scheduled task being suppressed.
// Catch all exceptions to avoid task being suppressed
log.error("Exception thrown during synchronization process", e);
}
}
}
/**
* Auxiliary task for pruning stale links.
*/
private class LinkPrunerTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
// TODO: There is still a slight possibility of mastership
// change occurring right with link going stale. This will
// result in the stale link not being pruned.
Maps.filterEntries(linkTimes, e -> {
if (!masterService.isLocalMaster(e.getKey().dst().deviceId())) {
return true;
}
if (isStale(e.getValue())) {
providerService.linkVanished(new DefaultLinkDescription(e.getKey().src(),
e.getKey().dst(),
DIRECT));
return true;
}
return false;
}).clear();
} catch (Exception e) {
// Catch all exceptions to avoid task being suppressed
log.error("Exception thrown during link pruning process", e);
}
}
private boolean isStale(long lastSeen) {
return lastSeen < System.currentTimeMillis() - staleLinkAge;
}
}
/**
* Provides processing context for the device link discovery helpers.
*/
private class InternalDiscoveryContext implements DiscoveryContext {
@Override
public MastershipService mastershipService() {
......@@ -492,13 +582,14 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
}
@Override
public long staleLinkAge() {
return staleLinkAge;
public boolean useBDDP() {
return useBDDP;
}
@Override
public boolean useBDDP() {
return useBDDP;
public void touchLink(LinkKey key) {
linkTimes.put(key, System.currentTimeMillis());
}
}
}
......
......@@ -15,7 +15,6 @@
*/
package org.onosproject.provider.lldp.impl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
......@@ -37,9 +36,7 @@ import org.onosproject.net.packet.PacketContext;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.PortNumber.portNumber;
......@@ -53,7 +50,7 @@ import static org.slf4j.LoggerFactory.getLogger;
* LLDP, send an LLDP for a single slow port. Based on FlowVisor topology
* discovery implementation.
*/
public class LinkDiscovery implements TimerTask {
class LinkDiscovery implements TimerTask {
private final Logger log = getLogger(getClass());
......@@ -72,9 +69,6 @@ public class LinkDiscovery implements TimerTask {
// Set of ports to be probed
private final Set<Long> ports = Sets.newConcurrentHashSet();
// Most recent time a link was seen
private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
/**
* Instantiates discovery manager for the given physical switch. Creates a
* generic LLDP packet that will be customized for the port it is sent out on.
......@@ -83,7 +77,7 @@ public class LinkDiscovery implements TimerTask {
* @param device the physical switch
* @param context discovery context
*/
public LinkDiscovery(Device device, DiscoveryContext context) {
LinkDiscovery(Device device, DiscoveryContext context) {
this.device = device;
this.context = context;
......@@ -102,7 +96,6 @@ public class LinkDiscovery implements TimerTask {
bddpEth.setEtherType(Ethernet.TYPE_BSN);
bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
bddpEth.setPad(true);
log.info("Using BDDP to discover network");
isStopped = true;
start();
......@@ -110,46 +103,47 @@ public class LinkDiscovery implements TimerTask {
}
synchronized void stop() {
isStopped = true;
timeout.cancel();
}
synchronized void start() {
if (isStopped) {
isStopped = false;
timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
} else {
log.warn("LinkDiscovery started multiple times?");
}
}
synchronized boolean isStopped() {
return isStopped || timeout.isCancelled();
}
/**
* Add physical port port to discovery process.
* Send out initial LLDP and label it as slow port.
*
* @param port the port
*/
public void addPort(Port port) {
void addPort(Port port) {
boolean newPort = ports.add(port.number().toLong());
boolean isMaster = context.mastershipService().isLocalMaster(device.id());
if (newPort && isMaster) {
log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
log.debug("Sending initial probe to port {}@{}", port.number().toLong(), device.id());
sendProbes(port.number().toLong());
}
}
/**
* Method called by remote port to acknowledge receipt of LLDP sent by
* this port. If slow port, updates label to fast. If fast port, decrements
* number of unacknowledged probes.
*
* @param key link key
*/
private void ackProbe(LinkKey key) {
long portNumber = key.src().port().toLong();
if (ports.contains(portNumber)) {
linkTimes.put(key, System.currentTimeMillis());
} else {
log.debug("Got ackProbe for non-existing port: {}", portNumber);
}
}
/**
* Handles an incoming LLDP packet. Creates link in topology and sends ACK
* to port where LLDP originated.
* Handles an incoming LLDP packet. Creates link in topology and adds the
* link for staleness tracking.
*
* @param packetContext packet context
* @return true if handled
*/
public boolean handleLLDP(PacketContext packetContext) {
boolean handleLLDP(PacketContext packetContext) {
Ethernet eth = packetContext.inPacket().parsed();
if (eth == null) {
return false;
......@@ -165,14 +159,13 @@ public class LinkDiscovery implements TimerTask {
ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
ackProbe(LinkKey.linkKey(src, dst));
LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
new DefaultLinkDescription(src, dst, Type.DIRECT) :
new DefaultLinkDescription(src, dst, Type.INDIRECT);
try {
context.providerService().linkDetected(ld);
context.touchLink(LinkKey.linkKey(src, dst));
} catch (IllegalStateException e) {
return true;
}
......@@ -195,52 +188,16 @@ public class LinkDiscovery implements TimerTask {
return;
}
if (!context.mastershipService().isLocalMaster(device.id())) {
if (!isStopped()) {
timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
}
return;
if (context.mastershipService().isLocalMaster(device.id())) {
log.trace("Sending probes from {}", device.id());
ports.forEach(this::sendProbes);
}
// Prune stale links
linkTimes.entrySet().stream()
.filter(e -> isStale(e.getKey(), e.getValue()))
.map(Map.Entry::getKey).collect(Collectors.toSet())
.forEach(this::pruneLink);
// Probe ports
log.trace("Sending probes from {}", device.id());
ports.forEach(this::sendProbes);
if (!isStopped()) {
timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
}
}
private void pruneLink(LinkKey key) {
linkTimes.remove(key);
LinkDescription desc = new DefaultLinkDescription(key.src(), key.dst(), Type.DIRECT);
context.providerService().linkVanished(desc);
}
private boolean isStale(LinkKey key, long lastSeen) {
return lastSeen < (System.currentTimeMillis() - context.staleLinkAge());
}
public synchronized void stop() {
isStopped = true;
timeout.cancel();
}
public synchronized void start() {
if (isStopped) {
isStopped = false;
timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
} else {
log.warn("LinkDiscovery started multiple times?");
}
}
/**
* Creates packet_out LLDP for specified output port.
*
......@@ -285,11 +242,8 @@ public class LinkDiscovery implements TimerTask {
}
}
public synchronized boolean isStopped() {
return isStopped || timeout.isCancelled();
}
boolean containsPort(long portNumber) {
return ports.contains(portNumber);
}
}
......
......@@ -48,6 +48,7 @@ import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.link.LinkServiceAdapter;
import org.onosproject.net.packet.DefaultInboundPacket;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.OutboundPacket;
......@@ -79,7 +80,8 @@ public class LLDPLinkProviderTest {
private static Port pd4;
private final LLDPLinkProvider provider = new LLDPLinkProvider();
private final TestLinkRegistry linkService = new TestLinkRegistry();
private final TestLinkRegistry linkRegistry = new TestLinkRegistry();
private final TestLinkService linkService = new TestLinkService();
private final TestPacketService packetService = new TestPacketService();
private final TestDeviceService deviceService = new TestDeviceService();
private final TestMasterShipService masterService = new TestMasterShipService();
......@@ -104,8 +106,9 @@ public class LLDPLinkProviderTest {
provider.coreService = coreService;
provider.deviceService = deviceService;
provider.linkService = linkService;
provider.packetService = packetService;
provider.providerRegistry = linkService;
provider.providerRegistry = linkRegistry;
provider.masterService = masterService;
provider.activate(null);
......@@ -498,4 +501,6 @@ public class LLDPLinkProviderTest {
}
private class TestLinkService extends LinkServiceAdapter {
}
}
......
......@@ -3,5 +3,6 @@
export ONOS_NIC="10.1.10.*"
export OC1="10.1.10.223"
unset ONOS_USE_SSH
export ONOS_APPS="drivers,openflow,fwd,proxyarp,mobility"
......