Thomas Vachuska

Fixed objective tracker not to do extra work so eagerly.

Added ability to see availability status as part of device events (for availability events)

Change-Id: I4a3476e203459ed72deee45f0a24e4b4373bd819
......@@ -20,7 +20,10 @@ import org.onosproject.event.AbstractEvent;
import org.onosproject.net.Device;
import org.onosproject.net.Port;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
/**
* Describes infrastructure device event.
......@@ -28,6 +31,7 @@ import static com.google.common.base.MoreObjects.toStringHelper;
public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
private final Port port;
private final boolean isAvailable;
/**
* Type of device events.
......@@ -102,6 +106,20 @@ public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
public DeviceEvent(Type type, Device device, Port port) {
super(type, device);
this.port = port;
this.isAvailable = false;
}
/**
* Creates an event for change of device availability for the given device
* and the current time.
*
* @param device event device subject
* @param isAvailable true if device became available; false otherwise
*/
public DeviceEvent(Device device, boolean isAvailable) {
super(DEVICE_AVAILABILITY_CHANGED, device);
this.port = null;
this.isAvailable = isAvailable;
}
/**
......@@ -115,6 +133,7 @@ public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
public DeviceEvent(Type type, Device device, Port port, long time) {
super(type, device, time);
this.port = port;
this.isAvailable = false;
}
/**
......@@ -126,6 +145,15 @@ public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
return port;
}
/**
* Indicates whether device became available or unavailable.
*
* @return if present, true indicates device came online; false if device went offline
*/
public Optional<Boolean> isAvailable() {
return type() == DEVICE_AVAILABILITY_CHANGED ? Optional.of(isAvailable) : Optional.empty();
}
@Override
public String toString() {
if (port == null) {
......@@ -137,5 +165,5 @@ public class DeviceEvent extends AbstractEvent<DeviceEvent.Type, Device> {
.add("subject", subject())
.add("port", port)
.toString();
}
}
}
......
......@@ -16,8 +16,8 @@
package org.onosproject.net.intent.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -25,8 +25,9 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.event.Event;
import org.onosproject.net.DeviceId;
import org.onosproject.net.ElementId;
import org.onosproject.net.HostId;
import org.onosproject.net.Link;
......@@ -34,18 +35,17 @@ import org.onosproject.net.LinkKey;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.IntentPartitionEvent;
import org.onosproject.net.intent.IntentPartitionEventListener;
import org.onosproject.net.intent.IntentPartitionService;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.link.LinkEvent;
import org.onosproject.net.resource.ResourceEvent;
import org.onosproject.net.resource.ResourceListener;
......@@ -57,26 +57,22 @@ import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.isNullOrEmpty;
import static org.onosproject.net.LinkKey.linkKey;
import static org.onosproject.net.intent.IntentState.INSTALLED;
import static org.onosproject.net.intent.IntentState.INSTALLING;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -86,14 +82,16 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class ObjectiveTracker implements ObjectiveTrackerService {
private final Logger log = getLogger(getClass());
//TODO make this configurable via component config
private static final long RECOMPILE_ALL_DELAY = 25; //ms
private final SetMultimap<LinkKey, Key> intentsByLink =
//TODO this could be slow as a point of synchronization
synchronizedSetMultimap(HashMultimap.<LinkKey, Key>create());
private final SetMultimap<ElementId, Key> intentsByDevice =
private final SetMultimap<ElementId, Key> intentsByElement =
synchronizedSetMultimap(HashMultimap.<ElementId, Key>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -109,31 +107,28 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
policy = ReferencePolicy.DYNAMIC)
policy = ReferencePolicy.DYNAMIC)
protected IntentService intentService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentPartitionService partitionService;
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
private final Timer timer = new Timer("onos/intent-objective-tracker", true);
private final Accumulator<WorkMessage> workMessageAccumulator = new InternalWorkAccumulator();
private TopologyListener listener = new InternalTopologyListener();
private ResourceListener resourceListener = new InternalResourceListener();
private DeviceListener deviceListener = new InternalDeviceListener();
private HostListener hostListener = new InternalHostListener();
private IntentPartitionEventListener partitionListener = new InternalPartitionListener();
private TopologyChangeDelegate delegate;
protected final AtomicBoolean updateScheduled = new AtomicBoolean(false);
protected final AtomicBoolean recompileAllScheduled = new AtomicBoolean(false);
@Activate
public void activate() {
topologyService.addListener(listener);
resourceService.addListener(resourceListener);
deviceService.addListener(deviceListener);
hostService.addListener(hostListener);
partitionService.addListener(partitionListener);
log.info("Started");
......@@ -143,7 +138,6 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
public void deactivate() {
topologyService.removeListener(listener);
resourceService.removeListener(resourceListener);
deviceService.removeListener(deviceListener);
hostService.removeListener(hostListener);
partitionService.removeListener(partitionListener);
log.info("Stopped");
......@@ -153,7 +147,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
if (intentService == null) {
intentService = service;
}
}
}
protected void unbindIntentService(IntentService service) {
if (intentService == service) {
......@@ -182,7 +176,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
if (resource instanceof Link) {
intentsByLink.put(linkKey((Link) resource), intentKey);
} else if (resource instanceof ElementId) {
intentsByDevice.put((ElementId) resource, intentKey);
intentsByElement.put((ElementId) resource, intentKey);
}
}
}
......@@ -194,7 +188,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
if (resource instanceof Link) {
intentsByLink.remove(linkKey((Link) resource), intentKey);
} else if (resource instanceof ElementId) {
intentsByDevice.remove(resource, intentKey);
intentsByElement.remove(resource, intentKey);
}
}
}
......@@ -209,18 +203,18 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
Intent intent = intentData.intent();
boolean isLocal = intentService.isLocal(key);
boolean isInstalled = intentData.state() == INSTALLING ||
intentData.state() == INSTALLED;
intentData.state() == INSTALLED;
List<Intent> installables = intentData.installables();
if (log.isTraceEnabled()) {
log.trace("intent {}, old: {}, new: {}, installableCount: {}, resourceCount: {}",
key,
intentsByDevice.values().contains(key),
intentsByElement.values().contains(key),
isLocal && isInstalled,
installables.size(),
intent.resources().size() +
installables.stream()
.mapToLong(i -> i.resources().size()).sum());
installables.stream()
.mapToLong(i -> i.resources().size()).sum());
}
if (isNullOrEmpty(installables) && intentData.state() == INSTALLED) {
......@@ -247,52 +241,39 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
private class InternalTopologyListener implements TopologyListener {
@Override
public void event(TopologyEvent event) {
executorService.execute(new TopologyChangeHandler(event));
}
}
// Re-dispatcher of topology change events.
private class TopologyChangeHandler implements Runnable {
private final TopologyEvent event;
TopologyChangeHandler(TopologyEvent event) {
this.event = event;
}
@Override
public void run() {
// If there is no delegate, why bother? Just bail.
if (delegate == null) {
return;
}
boolean recompileAllFailedIntents = false;
if (event.reasons() == null || event.reasons().isEmpty()) {
delegate.triggerCompile(Collections.emptySet(), true);
recompileAllFailedIntents = true;
} else {
Set<Key> intentsToRecompile = new HashSet<>();
boolean dontRecompileAllFailedIntents = true;
// Scan through the list of reasons and keep accruing all
// intents that need to be recompiled.
for (Event reason : event.reasons()) {
WorkMessage wi = new WorkMessage();
if (reason instanceof LinkEvent) {
LinkEvent linkEvent = (LinkEvent) reason;
final LinkKey linkKey = linkKey(linkEvent.subject());
synchronized (intentsByLink) {
Set<Key> intentKeys = intentsByLink.get(linkKey);
log.debug("recompile triggered by LinkEvent {} ({}) for {}",
linkKey, linkEvent.type(), intentKeys);
intentsToRecompile.addAll(intentKeys);
}
dontRecompileAllFailedIntents = dontRecompileAllFailedIntents &&
(linkEvent.type() == LINK_REMOVED ||
(linkEvent.type() == LINK_UPDATED &&
linkEvent.subject().isDurable()));
wi.linkKey = linkKey(linkEvent.subject());
recompileAllFailedIntents |=
!(linkEvent.type() == LINK_REMOVED ||
(linkEvent.type() == LINK_UPDATED &&
linkEvent.subject().isDurable()));
} else if (reason instanceof DeviceEvent) {
DeviceEvent deviceEvent = (DeviceEvent) reason;
wi.elementId = deviceEvent.subject().id();
recompileAllFailedIntents |=
(deviceEvent.type() == DeviceEvent.Type.DEVICE_ADDED ||
deviceEvent.type() == DeviceEvent.Type.DEVICE_UPDATED ||
deviceEvent.isAvailable().orElse(false));
} else {
continue;
}
workMessageAccumulator.add(wi);
}
delegate.triggerCompile(intentsToRecompile, !dontRecompileAllFailedIntents);
}
if (recompileAllFailedIntents) {
scheduleRecompileAll();
}
}
}
......@@ -300,79 +281,16 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
private class InternalResourceListener implements ResourceListener {
@Override
public void event(ResourceEvent event) {
if (event.subject().isSubTypeOf(PortNumber.class)) {
executorService.execute(() -> {
if (delegate == null) {
return;
}
delegate.triggerCompile(Collections.emptySet(), true);
});
if (event.type() == RESOURCE_ADDED &&
event.subject().isSubTypeOf(PortNumber.class)) {
scheduleRecompileAll();
}
//TODO we should probably track resources and trigger removal
}
}
//TODO consider adding flow rule event tracking
/*
* Re-dispatcher of device and host events.
*/
private class DeviceAvailabilityHandler implements Runnable {
private final ElementId id;
private final boolean available;
DeviceAvailabilityHandler(ElementId id, boolean available) {
this.id = checkNotNull(id);
this.available = available;
}
@Override
public void run() {
// If there is no delegate, why bother? Just bail.
if (delegate == null) {
return;
}
// TODO should we recompile on available==true?
final ImmutableSet<Key> snapshot;
synchronized (intentsByDevice) {
snapshot = ImmutableSet.copyOf(intentsByDevice.get(id));
}
delegate.triggerCompile(snapshot, available);
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
switch (type) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_REMOVED:
case DEVICE_SUSPENDED:
case DEVICE_UPDATED:
DeviceId id = event.subject().id();
// TODO we need to check whether AVAILABILITY_CHANGED means up or down
boolean available = (type == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
type == DeviceEvent.Type.DEVICE_ADDED ||
type == DeviceEvent.Type.DEVICE_UPDATED);
executorService.execute(new DeviceAvailabilityHandler(id, available));
break;
case PORT_ADDED:
case PORT_REMOVED:
case PORT_UPDATED:
case PORT_STATS_UPDATED:
default:
// Don't handle port events for now
break;
}
}
}
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
......@@ -381,7 +299,10 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
case HOST_ADDED:
case HOST_MOVED:
case HOST_REMOVED:
executorService.execute(new DeviceAvailabilityHandler(id, false));
WorkMessage wi = new WorkMessage();
wi.elementId = id;
workMessageAccumulator.add(wi);
scheduleRecompileAll();
break;
case HOST_UPDATED:
default:
......@@ -413,7 +334,31 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
private void scheduleIntentUpdate(int afterDelaySec) {
if (updateScheduled.compareAndSet(false, true)) {
executor.schedule(this::doIntentUpdate, afterDelaySec, TimeUnit.SECONDS);
timer.schedule(new TimerTask() {
@Override
public void run() {
ObjectiveTracker.this.doIntentUpdate();
}
}, afterDelaySec * 1_000);
}
}
private void doRecompileAll() {
recompileAllScheduled.set(false);
if (delegate != null) {
delegate.triggerCompile(Collections.emptySet(), true);
}
}
private void scheduleRecompileAll() {
if (recompileAllScheduled.compareAndSet(false, true)) {
timer.schedule(new TimerTask() {
@Override
public void run() {
ObjectiveTracker.this.doRecompileAll();
}
}, RECOMPILE_ALL_DELAY);
}
}
......@@ -424,4 +369,47 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
scheduleIntentUpdate(1);
}
}
private static class WorkMessage {
ElementId elementId;
LinkKey linkKey;
}
private class InternalWorkAccumulator extends AbstractAccumulator<WorkMessage> {
private static final int MAX_WORK = 100_000;
private static final int MAX_WORK_TIME = 500;
private static final int MAX_WORK_IDLE_TIME = 20;
InternalWorkAccumulator() {
super(timer, MAX_WORK, MAX_WORK_TIME, MAX_WORK_IDLE_TIME);
}
@Override
public void processItems(List<WorkMessage> items) {
// If there is no delegate, why bother? Just bail.
if (delegate == null) {
return;
}
Set<Key> keys = Sets.newHashSet();
items.forEach(wi -> {
if (wi.elementId != null) {
synchronized (intentsByElement) {
keys.addAll(intentsByElement.get(wi.elementId));
}
}
if (wi.linkKey != null) {
synchronized (intentsByLink) {
keys.addAll(intentsByLink.get(wi.linkKey));
}
}
});
if (!keys.isEmpty()) {
delegate.triggerCompile(keys, false);
}
}
}
}
......
......@@ -15,12 +15,8 @@
*/
package org.onosproject.net.intent.impl;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......@@ -30,11 +26,13 @@ import org.onosproject.core.IdGenerator;
import org.onosproject.event.Event;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.Link;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.MockIdGenerator;
......@@ -46,18 +44,17 @@ import org.onosproject.net.topology.Topology;
import org.onosproject.net.topology.TopologyEvent;
import org.onosproject.net.topology.TopologyListener;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.easymock.EasyMock.createMock;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.onosproject.net.resource.ResourceEvent.Type.*;
import static org.onosproject.net.NetTestTools.APP_ID;
import static org.onosproject.net.NetTestTools.device;
import static org.onosproject.net.NetTestTools.link;
import static org.hamcrest.Matchers.*;
import static org.onosproject.net.NetTestTools.*;
import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
/**
* Tests for the objective tracker.
......@@ -69,7 +66,7 @@ public class ObjectiveTrackerTest {
private TestTopologyChangeDelegate delegate;
private List<Event> reasons;
private TopologyListener listener;
private DeviceListener deviceListener;
private HostListener hostListener;
private ResourceListener resourceListener;
private IdGenerator mockGenerator;
......@@ -86,7 +83,7 @@ public class ObjectiveTrackerTest {
tracker.setDelegate(delegate);
reasons = new LinkedList<>();
listener = TestUtils.getField(tracker, "listener");
deviceListener = TestUtils.getField(tracker, "deviceListener");
hostListener = TestUtils.getField(tracker, "hostListener");
resourceListener = TestUtils.getField(tracker, "resourceListener");
mockGenerator = new MockIdGenerator();
Intent.bindIdGenerator(mockGenerator);
......@@ -107,16 +104,14 @@ public class ObjectiveTrackerTest {
* to be generated.
*/
static class TestTopologyChangeDelegate implements TopologyChangeDelegate {
CountDownLatch latch = new CountDownLatch(1);
List<Key> intentIdsFromEvent;
boolean compileAllFailedFromEvent;
List<Key> intentIdsFromEvent = Lists.newArrayList();
boolean compileAllFailedFromEvent = false;
@Override
public void triggerCompile(Iterable<Key> intentKeys,
boolean compileAllFailed) {
intentIdsFromEvent = Lists.newArrayList(intentKeys);
compileAllFailedFromEvent = compileAllFailed;
public void triggerCompile(Iterable<Key> intentKeys, boolean compileAllFailed) {
intentKeys.forEach(intentIdsFromEvent::add);
compileAllFailedFromEvent |= compileAllFailed;
latch.countDown();
}
}
......@@ -128,16 +123,10 @@ public class ObjectiveTrackerTest {
*/
@Test
public void testEventNoReasons() throws InterruptedException {
final TopologyEvent event = new TopologyEvent(
TopologyEvent.Type.TOPOLOGY_CHANGED,
topology,
null);
TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, null);
listener.event(event);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
......@@ -150,20 +139,15 @@ public class ObjectiveTrackerTest {
*/
@Test
public void testEventLinkDownNoMatches() throws InterruptedException {
final Link link = link("src", 1, "dst", 2);
final LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
Link link = link("src", 1, "dst", 2);
LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
reasons.add(linkEvent);
final TopologyEvent event = new TopologyEvent(
TopologyEvent.Type.TOPOLOGY_CHANGED,
topology,
reasons);
TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
listener.event(event);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
// we expect no message, latch should never fire
assertThat(delegate.latch.await(25, TimeUnit.MILLISECONDS), is(false));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(false));
}
......@@ -175,20 +159,14 @@ public class ObjectiveTrackerTest {
*/
@Test
public void testEventLinkAdded() throws InterruptedException {
final Link link = link("src", 1, "dst", 2);
final LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_ADDED, link);
Link link = link("src", 1, "dst", 2);
LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_ADDED, link);
reasons.add(linkEvent);
final TopologyEvent event = new TopologyEvent(
TopologyEvent.Type.TOPOLOGY_CHANGED,
topology,
reasons);
TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
listener.event(event);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
......@@ -200,24 +178,18 @@ public class ObjectiveTrackerTest {
*/
@Test
public void testEventLinkDownMatch() throws Exception {
final Link link = link("src", 1, "dst", 2);
final LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
Link link = link("src", 1, "dst", 2);
LinkEvent linkEvent = new LinkEvent(LinkEvent.Type.LINK_REMOVED, link);
reasons.add(linkEvent);
final TopologyEvent event = new TopologyEvent(
TopologyEvent.Type.TOPOLOGY_CHANGED,
topology,
reasons);
TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
final Key key = Key.of(0x333L, APP_ID);
Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(link);
tracker.addTrackedResources(key, resources);
listener.event(event);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(false));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
......@@ -232,13 +204,11 @@ public class ObjectiveTrackerTest {
@Test
public void testResourceEvent() throws Exception {
ResourceEvent event = new ResourceEvent(RESOURCE_ADDED,
Resources.discrete(DeviceId.deviceId("a"), PortNumber.portNumber(1)).resource());
Resources.discrete(DeviceId.deviceId("a"),
PortNumber.portNumber(1)).resource());
resourceListener.event(event);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
......@@ -251,25 +221,27 @@ public class ObjectiveTrackerTest {
@Test
public void testEventHostAvailableMatch() throws Exception {
final Device host = device("host1");
// we will expect 2 delegate calls
delegate.latch = new CountDownLatch(2);
final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
Device host = device("host1");
DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
reasons.add(deviceEvent);
final Key key = Key.of(0x333L, APP_ID);
Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(host.id());
tracker.addTrackedResources(key, resources);
deviceListener.event(deviceEvent);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
reasons.add(deviceEvent);
TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
listener.event(event);
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(true));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
equalTo("0x333"));
equalTo("0x333"));
}
/**
......@@ -280,25 +252,21 @@ public class ObjectiveTrackerTest {
@Test
public void testEventHostUnavailableMatch() throws Exception {
final Device host = device("host1");
final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, host);
Device host = device("host1");
DeviceEvent deviceEvent = new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, host);
reasons.add(deviceEvent);
final Key key = Key.of(0x333L, APP_ID);
Key key = Key.of(0x333L, APP_ID);
Collection<NetworkResource> resources = ImmutableSet.of(host.id());
tracker.addTrackedResources(key, resources);
deviceListener.event(deviceEvent);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
TopologyEvent event = new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, topology, reasons);
listener.event(event);
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(1));
assertThat(delegate.compileAllFailedFromEvent, is(false));
assertThat(delegate.intentIdsFromEvent.get(0).toString(),
equalTo("0x333"));
assertThat(delegate.intentIdsFromEvent.get(0).toString(), equalTo("0x333"));
}
/**
......@@ -309,20 +277,12 @@ public class ObjectiveTrackerTest {
@Test
public void testEventHostAvailableNoMatch() throws Exception {
final Device host = device("host1");
final DeviceEvent deviceEvent =
new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, host);
reasons.add(deviceEvent);
deviceListener.event(deviceEvent);
assertThat(
delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS),
is(true));
Host host = host("00:11:22:33:44:55/6", "device1");
HostEvent hostEvent = new HostEvent(HostEvent.Type.HOST_ADDED, host);
hostListener.event(hostEvent);
assertThat(delegate.latch.await(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS), is(true));
assertThat(delegate.intentIdsFromEvent, hasSize(0));
assertThat(delegate.compileAllFailedFromEvent, is(true));
}
}
......
......@@ -444,7 +444,7 @@ public class GossipDeviceStore
boolean wasOnline = availableDevices.contains(newDevice.id());
markOnline(newDevice.id(), newTimestamp);
if (!wasOnline) {
notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
notifyDelegateIfNotNull(new DeviceEvent(newDevice, true));
}
}
return event;
......@@ -487,7 +487,7 @@ public class GossipDeviceStore
}
boolean removed = availableDevices.remove(deviceId);
if (removed) {
return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
return new DeviceEvent(device, false);
}
return null;
}
......