Thomas Vachuska
Committed by Gerrit Code Review

Finished implementation of edge port manager using topology event async notifications.

Change-Id: Ide0eb947ba6400dafe11dac73af1466aaf0ce451
......@@ -23,7 +23,9 @@ import java.nio.ByteBuffer;
import java.util.Optional;
/**
* Service for interacting with an inventory of network edge ports.
* Service for interacting with an inventory of network edge ports. A port
* is considered an edge port if it is an active port and does not have an
* infrastructure link associated with it.
*/
public interface EdgePortService {
......
......@@ -54,6 +54,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onosproject.net.edge.EdgePortEvent.Type.EDGE_PORT_ADDED;
import static org.onosproject.net.edge.EdgePortEvent.Type.EDGE_PORT_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -65,17 +66,17 @@ import static org.slf4j.LoggerFactory.getLogger;
@Service
public class EdgeManager implements EdgePortService {
private final ListenerRegistry<EdgePortEvent, EdgePortListener>
listenerRegistry = new ListenerRegistry<>();
private final Logger log = getLogger(getClass());
private Topology topology;
private final TopologyListener topologyListener = new InnerTopologyListener();
private final Map<DeviceId, Set<ConnectPoint>> connectionPoints = Maps.newConcurrentMap();
private final ListenerRegistry<EdgePortEvent, EdgePortListener>
listenerRegistry = new ListenerRegistry<>();
private final TopologyListener topologyListener = new InnerTopologyListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
......@@ -102,19 +103,20 @@ public class EdgeManager implements EdgePortService {
log.info("Stopped");
}
@Override
public boolean isEdgePoint(ConnectPoint point) {
return !topologyService.isInfrastructure(topologyService.currentTopology(), point);
}
@Override
public Iterable<ConnectPoint> getEdgePoints() {
//TODO if this is called before any notifications need to populate structure
ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
connectionPoints.forEach((k, v) -> v.forEach(builder::add));
return builder.build();
}
@Override
public Iterable<ConnectPoint> getEdgePoints(DeviceId deviceId) {
//TODO if this is called before any notifications need to populate structure
ImmutableSet.Builder<ConnectPoint> builder = ImmutableSet.builder();
Set<ConnectPoint> set = connectionPoints.get(deviceId);
if (set != null) {
......@@ -123,6 +125,7 @@ public class EdgeManager implements EdgePortService {
return builder.build();
}
@Override
public void emitPacket(ByteBuffer data, Optional<TrafficTreatment> treatment) {
TrafficTreatment.Builder builder = treatment.isPresent() ?
DefaultTrafficTreatment.builder(treatment.get()) :
......@@ -130,13 +133,13 @@ public class EdgeManager implements EdgePortService {
getEdgePoints().forEach(p -> packetService.emit(packet(builder, p, data)));
}
@Override
public void emitPacket(DeviceId deviceId, ByteBuffer data,
Optional<TrafficTreatment> treatment) {
TrafficTreatment.Builder builder = treatment.isPresent() ?
DefaultTrafficTreatment.builder(treatment.get()) :
DefaultTrafficTreatment.builder();
getEdgePoints(deviceId).forEach(p -> packetService.emit(packet(builder, p, data)));
}
private OutboundPacket packet(TrafficTreatment.Builder builder, ConnectPoint point, ByteBuffer data) {
......@@ -144,15 +147,19 @@ public class EdgeManager implements EdgePortService {
return new DefaultOutboundPacket(point.deviceId(), builder.build(), data);
}
@Override
public void addListener(EdgePortListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(EdgePortListener listener) {
listenerRegistry.removeListener(listener);
}
// Internal listener for topo events used to keep our edge-port cache
// up to date.
private class InnerTopologyListener implements TopologyListener {
@Override
public void event(TopologyEvent event) {
......@@ -161,12 +168,9 @@ public class EdgeManager implements EdgePortService {
if (triggers != null) {
triggers.forEach(reason -> {
if (reason instanceof DeviceEvent) {
//TODO spuriously catches events not handled in the handler method
processDeviceEvent((DeviceEvent) reason);
} else if (reason instanceof LinkEvent) {
processLinkEvent((LinkEvent) reason);
} else {
System.out.println(reason.toString());
}
});
} else {
......@@ -175,12 +179,13 @@ public class EdgeManager implements EdgePortService {
}
}
// Initial loading of the edge port cache.
private void loadAllEdgePorts() {
deviceService.getDevices().forEach(d -> deviceService.getPorts(d.id())
.forEach(p -> addEdgePort(new ConnectPoint(d.id(), p.number()))));
}
// Processes a link event by adding or removing its end-points in our cache.
private void processLinkEvent(LinkEvent event) {
if (event.type() == LinkEvent.Type.LINK_ADDED) {
removeEdgePort(event.subject().src());
......@@ -189,22 +194,37 @@ public class EdgeManager implements EdgePortService {
addEdgePort(event.subject().src());
addEdgePort(event.subject().dst());
}
}
// Processes a device event by adding or removing its end-points in our cache.
private void processDeviceEvent(DeviceEvent event) {
if (event.type() == DeviceEvent.Type.PORT_ADDED) {
addEdgePort(new ConnectPoint(event.subject().id(), event.port().number()));
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
removeEdgePort(new ConnectPoint(event.subject().id(), event.port().number()));
}
}
DeviceEvent.Type type = event.type();
DeviceId id = event.subject().id();
if (type == DEVICE_ADDED ||
type == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(id)) {
// When device is added or becomes available, add all its ports
deviceService.getPorts(event.subject().id())
.forEach(p -> addEdgePort(new ConnectPoint(id, p.number())));
} else if (type == DEVICE_REMOVED ||
type == DEVICE_AVAILABILITY_CHANGED && !deviceService.isAvailable(id)) {
// When device is removed or becomes unavailable, remove all its ports
deviceService.getPorts(event.subject().id())
.forEach(p -> removeEdgePort(new ConnectPoint(id, p.number())));
connectionPoints.remove(id);
} else if (type == DeviceEvent.Type.PORT_ADDED ||
type == PORT_UPDATED && event.port().isEnabled()) {
addEdgePort(new ConnectPoint(id, event.port().number()));
} else if (type == DeviceEvent.Type.PORT_REMOVED ||
type == PORT_UPDATED && !event.port().isEnabled()) {
removeEdgePort(new ConnectPoint(id, event.port().number()));
}
}
// Adds the specified connection point to the edge points if needed.
private void addEdgePort(ConnectPoint point) {
//TODO case of link removed and one of the end ports removed in same topo cycle
//TODO pt2. resulting behavior will be adding a non-existent edge to the set
if (!topologyService.isInfrastructure(topology, point)) {
if (!topologyService.isInfrastructure(topology, point) && !point.port().isLogical()) {
Set<ConnectPoint> set = connectionPoints.get(point.deviceId());
if (set == null) {
set = Sets.newConcurrentHashSet();
......@@ -214,13 +234,11 @@ public class EdgeManager implements EdgePortService {
eventDispatcher.post(new EdgePortEvent(EDGE_PORT_ADDED, point));
}
}
}
// Removes the specified connection point from the edge points.
private void removeEdgePort(ConnectPoint point) {
//TODO need to check that points still exist IE when a link and port are removed
//TODO pt2 and both events are captures in the same topo update
if (!topologyService.isInfrastructure(topology, point)) {
if (!point.port().isLogical()) {
Set<ConnectPoint> set = connectionPoints.get(point.deviceId());
if (set == null) {
return;
......@@ -232,6 +250,5 @@ public class EdgeManager implements EdgePortService {
connectionPoints.remove(point.deviceId());
}
}
}
}
......