Yuta HIGUCHI
Committed by Jonathan Hart

Lazily populate Pipeliner cache

Change-Id: Ibbb9312b47c2c61df9ed15370b46fb07a8c7a16c
......@@ -27,9 +27,6 @@ import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.util.ItemNotFoundException;
import org.onosproject.cluster.ClusterService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
......@@ -87,9 +84,6 @@ public class FlowObjectiveManager implements FlowObjectiveService {
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
// Note: The following dependencies are added on behalf of the pipeline
......@@ -116,7 +110,6 @@ public class FlowObjectiveManager implements FlowObjectiveService {
private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
private final PipelinerContext context = new InnerPipelineContext();
private final MastershipListener mastershipListener = new InnerMastershipListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
......@@ -133,16 +126,13 @@ public class FlowObjectiveManager implements FlowObjectiveService {
protected void activate() {
executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
flowObjectiveStore.setDelegate(delegate);
mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener);
deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
log.info("Started");
}
@Deactivate
protected void deactivate() {
flowObjectiveStore.unsetDelegate(delegate);
mastershipService.removeListener(mastershipListener);
deviceService.removeListener(deviceListener);
executorService.shutdown();
pipeliners.clear();
......@@ -265,13 +255,24 @@ public class FlowObjectiveManager implements FlowObjectiveService {
// Retrieves the device pipeline behaviour from the cache.
private Pipeliner getDevicePipeliner(DeviceId deviceId) {
return pipeliners.get(deviceId);
return pipeliners.computeIfAbsent(deviceId, this::initPipelineHandler);
}
private void setupPipelineHandler(DeviceId deviceId) {
/**
* Creates and initialize {@link Pipeliner}.
* <p>
* Note: Expected to be called under per-Device lock.
* e.g., {@code pipeliners}' Map#compute family methods
*
* @param deviceId Device to initialize pipeliner
* @return {@link Pipeliner} instance or null
*/
private Pipeliner initPipelineHandler(DeviceId deviceId) {
start = now();
// ?? We never use defaultDriverService, do we still need this check?
if (defaultDriverService == null) {
// We're not ready to go to work yet.
return;
return null;
}
// Attempt to lookup the handler in the cache
......@@ -286,11 +287,11 @@ public class FlowObjectiveManager implements FlowObjectiveService {
if (!handler.driver().hasBehaviour(Pipeliner.class)) {
log.warn("Pipeline behaviour not supported for device {}",
deviceId);
return;
return null;
}
} catch (ItemNotFoundException e) {
log.warn("No applicable driver for device {}", deviceId);
return;
return null;
}
driverHandlers.put(deviceId, handler);
......@@ -304,28 +305,8 @@ public class FlowObjectiveManager implements FlowObjectiveService {
Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
hbTime = now();
pipeliner.init(deviceId, context);
pipeliners.put(deviceId, pipeliner);
}
// Triggers driver setup when the local node becomes a device master.
private class InnerMastershipListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
switch (event.type()) {
case MASTER_CHANGED:
log.debug("mastership changed on device {}", event.subject());
start = now();
if (deviceService.isAvailable(event.subject())) {
setupPipelineHandler(event.subject());
}
stopWatch();
break;
case BACKUPS_CHANGED:
break;
default:
break;
}
}
return pipeliner;
}
// Triggers driver setup when a device is (re)detected.
......@@ -337,18 +318,23 @@ public class FlowObjectiveManager implements FlowObjectiveService {
case DEVICE_AVAILABILITY_CHANGED:
log.debug("Device either added or availability changed {}",
event.subject().id());
start = now();
if (deviceService.isAvailable(event.subject().id())) {
log.debug("Device is now available {}", event.subject().id());
setupPipelineHandler(event.subject().id());
getDevicePipeliner(event.subject().id());
} else {
log.debug("Device is no longer available {}", event.subject().id());
// evict Pipeliner cache.
// User might restart Device to assign new Driver/Pipeliner
// loaded afterwards.
pipeliners.remove(event.subject().id());
}
stopWatch();
break;
case DEVICE_UPDATED:
break;
case DEVICE_REMOVED:
break;
case DEVICE_SUSPENDED:
// evict Pipeliner cache.
pipeliners.remove(event.subject().id());
break;
case PORT_ADDED:
break;
......
......@@ -23,9 +23,6 @@ import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
import org.onlab.packet.ChassisId;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
......@@ -190,7 +187,6 @@ public class FlowObjectiveManagerTest {
public void initializeTest() {
manager = new FlowObjectiveManager();
manager.flowObjectiveStore = new TestFlowObjectiveStore();
manager.mastershipService = new MastershipServiceAdapter();
manager.deviceService = new TestDeviceService();
manager.defaultDriverService = new TestDriversLoader();
manager.driverService = new TestDriverService();
......@@ -377,40 +373,4 @@ public class FlowObjectiveManagerTest {
assertThat(filteringObjectives, hasSize(0));
assertThat(nextObjectives, hasSize(0));
}
/**
* Tests recepit of a device mastership event.
*
* @throws TestUtilsException if lookup of a field fails
*/
@Test
public void deviceMastershipEvent() throws TestUtilsException {
TrafficSelector selector = DefaultTrafficSelector.emptySelector();
TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
MastershipEvent event =
new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, id2, null);
MastershipListener listener = TestUtils.getField(manager, "mastershipListener");
assertThat(listener, notNullValue());
listener.event(event);
ForwardingObjective forward =
DefaultForwardingObjective.builder()
.fromApp(NetTestTools.APP_ID)
.withFlag(ForwardingObjective.Flag.SPECIFIC)
.withSelector(selector)
.withTreatment(treatment)
.makePermanent()
.add();
manager.forward(id2, forward);
// new device should have an objective now
TestTools.assertAfter(RETRY_MS, () ->
assertThat(forwardingObjectives, hasSize(1)));
assertThat(forwardingObjectives, hasItem("of:d2"));
assertThat(filteringObjectives, hasSize(0));
assertThat(nextObjectives, hasSize(0));
}
}
......