Yuta HIGUCHI

Adding some tests for GossipDeviceStore + bugfix

Change-Id: Ic0d55fa499b1d66131f059b4a47cd105c55a6e63
......@@ -62,6 +62,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
......@@ -58,7 +58,7 @@ class DeviceDescriptions {
*
* @param newDesc new DeviceDescription
*/
public synchronized void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
public void putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
Timestamped<DeviceDescription> oldOne = deviceDesc;
Timestamped<DeviceDescription> newOne = newDesc;
if (oldOne != null) {
......@@ -76,7 +76,7 @@ class DeviceDescriptions {
*
* @param newDesc new PortDescription
*/
public synchronized void putPortDesc(Timestamped<PortDescription> newDesc) {
public void putPortDesc(Timestamped<PortDescription> newDesc) {
Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
Timestamped<PortDescription> newOne = newDesc;
if (oldOne != null) {
......
package org.onlab.onos.store.device.impl;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
......@@ -118,7 +119,7 @@ public class GossipDeviceStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
......@@ -206,14 +207,19 @@ public class GossipDeviceStore
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
DeviceId deviceId,
DeviceDescription deviceDescription) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
final DeviceEvent event;
final Timestamped<DeviceDescription> mergedDesc;
synchronized (getDeviceDescriptions(deviceId)) {
event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc();
}
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a device update topology event for providerId: "
+ providerId + " and deviceId: " + deviceId, e);
......@@ -317,8 +323,8 @@ public class GossipDeviceStore
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
Timestamp timestamp = clockService.getTimestamp(deviceId);
DeviceEvent event = markOfflineInternal(deviceId, timestamp);
final Timestamp timestamp = clockService.getTimestamp(deviceId);
final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
if (event != null) {
log.info("Notifying peers of a device offline topology event for deviceId: {}",
deviceId);
......@@ -390,17 +396,33 @@ public class GossipDeviceStore
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
DeviceId deviceId,
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
Timestamped<List<PortDescription>> timestampedPortDescriptions =
new Timestamped<>(portDescriptions, newTimestamp);
List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<List<PortDescription>> timestampedInput
= new Timestamped<>(portDescriptions, newTimestamp);
final List<DeviceEvent> events;
final Timestamped<List<PortDescription>> merged;
synchronized (getDeviceDescriptions(deviceId)) {
events = updatePortsInternal(providerId, deviceId, timestampedInput);
final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId);
List<PortDescription> mergedList =
FluentIterable.from(portDescriptions)
.transform(new Function<PortDescription, PortDescription>() {
@Override
public PortDescription apply(PortDescription input) {
// lookup merged port description
return descs.getPortDesc(input.portNumber()).value();
}
}).toList();
merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
}
if (!events.isEmpty()) {
log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
} catch (IOException e) {
log.error("Failed to notify peers of a port update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
......@@ -527,16 +549,25 @@ public class GossipDeviceStore
}
@Override
public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
DeviceId deviceId,
PortDescription portDescription) {
final Timestamp newTimestamp = clockService.getTimestamp(deviceId);
final Timestamped<PortDescription> deltaDesc
= new Timestamped<>(portDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<PortDescription> mergedDesc;
synchronized (getDeviceDescriptions(deviceId)) {
event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
mergedDesc = getDeviceDescriptions(deviceId).get(providerId)
.getPortDesc(portDescription.portNumber());
}
if (event != null) {
log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
providerId, deviceId);
try {
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
} catch (IOException e) {
log.error("Failed to notify peers of a port status update topology event or providerId: "
+ providerId + " and deviceId: " + deviceId, e);
......@@ -684,7 +715,7 @@ public class GossipDeviceStore
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
......
package org.onlab.onos.store.device.impl;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.onlab.onos.cluster.ControllerNode.State.*;
import static org.onlab.onos.net.DefaultAnnotations.union;
import static java.util.Arrays.asList;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -14,6 +18,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.easymock.Capture;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
......@@ -90,14 +95,25 @@ public class GossipDeviceStoreTest {
.set("B4", "b4")
.build();
private static final NodeId MYSELF = new NodeId("myself");
// local node
private static final NodeId NID1 = new NodeId("local");
private static final ControllerNode ONOS1 =
new DefaultControllerNode(NID1, IpPrefix.valueOf("127.0.0.1"));
// remote node
private static final NodeId NID2 = new NodeId("remote");
private static final ControllerNode ONOS2 =
new DefaultControllerNode(NID2, IpPrefix.valueOf("127.0.0.2"));
private static final List<SparseAnnotations> NO_ANNOTATION = Collections.<SparseAnnotations>emptyList();
private TestGossipDeviceStore testGossipDeviceStore;
private GossipDeviceStore gossipDeviceStore;
private DeviceStore deviceStore;
private DeviceClockManager deviceClockManager;
private ClockService clockService;
private ClusterCommunicationService clusterCommunicator;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
......@@ -113,15 +129,22 @@ public class GossipDeviceStoreTest {
deviceClockManager.activate();
clockService = deviceClockManager;
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(NID1, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(NID1, 2));
ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
anyObject(ClusterMessageHandler.class));
expectLastCall().anyTimes();
replay(clusterCommunicator);
ClusterService clusterService = new TestClusterService();
gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
testGossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
gossipDeviceStore = testGossipDeviceStore;
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
verify(clusterCommunicator);
reset(clusterCommunicator);
}
@After
......@@ -135,7 +158,16 @@ public class GossipDeviceStoreTest {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN, annotations);
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
.andReturn(true).anyTimes();
} catch (IOException e) {
fail("Should never reach here");
}
replay(clusterCommunicator);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
verify(clusterCommunicator);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion,
......@@ -163,9 +195,9 @@ public class GossipDeviceStoreTest {
* @param annotations
*/
private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
DefaultAnnotations expected = DefaultAnnotations.builder().build();
SparseAnnotations expected = DefaultAnnotations.builder().build();
for (SparseAnnotations a : annotations) {
expected = DefaultAnnotations.merge(expected, a);
expected = DefaultAnnotations.union(expected, a);
}
assertEquals(expected.keys(), actual.keys());
for (String key : expected.keys()) {
......@@ -173,6 +205,36 @@ public class GossipDeviceStoreTest {
}
}
private static void assertDeviceDescriptionEquals(DeviceDescription expected,
DeviceDescription actual) {
if (expected == actual) {
return;
}
assertEquals(expected.deviceURI(), actual.deviceURI());
assertEquals(expected.hwVersion(), actual.hwVersion());
assertEquals(expected.manufacturer(), actual.manufacturer());
assertEquals(expected.serialNumber(), actual.serialNumber());
assertEquals(expected.swVersion(), actual.swVersion());
assertAnnotationsEquals(actual.annotations(), expected.annotations());
}
private static void assertDeviceDescriptionEquals(DeviceDescription expected,
List<SparseAnnotations> expectedAnnotations,
DeviceDescription actual) {
if (expected == actual) {
return;
}
assertEquals(expected.deviceURI(), actual.deviceURI());
assertEquals(expected.hwVersion(), actual.hwVersion());
assertEquals(expected.manufacturer(), actual.manufacturer());
assertEquals(expected.serialNumber(), actual.serialNumber());
assertEquals(expected.swVersion(), actual.swVersion());
assertAnnotationsEquals(actual.annotations(),
expectedAnnotations.toArray(new SparseAnnotations[0]));
}
@Test
public final void testGetDeviceCount() {
assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
......@@ -215,56 +277,123 @@ public class GossipDeviceStoreTest {
assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
}
private void assertInternalDeviceEvent(NodeId sender,
DeviceId deviceId,
ProviderId providerId,
DeviceDescription expectedDesc,
Capture<ClusterMessage> actualMsg) {
assertTrue(actualMsg.hasCaptured());
assertEquals(sender, actualMsg.getValue().sender());
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
actualMsg.getValue().subject());
InternalDeviceEvent addEvent
= testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
assertEquals(deviceId, addEvent.deviceId());
assertEquals(providerId, addEvent.providerId());
assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
}
private void assertInternalDeviceEvent(NodeId sender,
DeviceId deviceId,
ProviderId providerId,
DeviceDescription expectedDesc,
List<SparseAnnotations> expectedAnnotations,
Capture<ClusterMessage> actualMsg) {
assertTrue(actualMsg.hasCaptured());
assertEquals(sender, actualMsg.getValue().sender());
assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
actualMsg.getValue().subject());
InternalDeviceEvent addEvent
= testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
assertEquals(deviceId, addEvent.deviceId());
assertEquals(providerId, addEvent.providerId());
assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value());
}
@Test
public final void testCreateOrUpdateDevice() {
public final void testCreateOrUpdateDevice() throws IOException {
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PID, description, bcast);
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN);
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
reset(clusterCommunicator);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
}
@Test
public final void testCreateOrUpdateDeviceAncillary() {
public final void testCreateOrUpdateDeviceAncillary() throws IOException {
// add
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, A2);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(PIDA, event.subject().providerId());
assertAnnotationsEquals(event.subject().annotations(), A2);
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast);
// update from primary
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN, A1);
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
assertEquals(PID, event2.subject().providerId());
assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
assertTrue(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast);
// no-op update from primary
resetCommunicatorExpectingNoBroadcast(bcast);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
verify(clusterCommunicator);
assertFalse("no broadcast expected", bcast.hasCaptured());
// For now, Ancillary is ignored once primary appears
resetCommunicatorExpectingNoBroadcast(bcast);
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
verify(clusterCommunicator);
assertFalse("no broadcast expected", bcast.hasCaptured());
// But, Ancillary annotations will be in effect
DeviceDescription description3 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN, A2_2);
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
assertEquals(DEVICE_UPDATED, event3.type());
// basic information will be the one from Primary
......@@ -273,6 +402,11 @@ public class GossipDeviceStoreTest {
// but annotation from Ancillary will be merged
assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
assertTrue(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
// note: only annotation from PIDA is sent over the wire
assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
asList(union(A2, A2_2)), bcast);
}
......@@ -282,14 +416,24 @@ public class GossipDeviceStoreTest {
putDevice(DID1, SW1);
assertTrue(deviceStore.isAvailable(DID1));
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.markOffline(DID1);
assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
assertDevice(DID1, SW1, event.subject());
assertFalse(deviceStore.isAvailable(DID1));
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
resetCommunicatorExpectingNoBroadcast(bcast);
DeviceEvent event2 = deviceStore.markOffline(DID1);
assertNull("No change, no event", event2);
}
verify(clusterCommunicator);
assertFalse(bcast.hasCaptured());
}
@Test
public final void testUpdatePorts() {
......@@ -298,8 +442,13 @@ public class GossipDeviceStoreTest {
new DefaultPortDescription(P1, true),
new DefaultPortDescription(P2, true)
);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
for (DeviceEvent event : events) {
......@@ -318,7 +467,12 @@ public class GossipDeviceStoreTest {
new DefaultPortDescription(P3, true)
);
resetCommunicatorExpectingSingleBroadcast(bcast);
events = deviceStore.updatePorts(PID, DID1, pds2);
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -341,7 +495,12 @@ public class GossipDeviceStoreTest {
new DefaultPortDescription(P1, false),
new DefaultPortDescription(P2, true)
);
resetCommunicatorExpectingSingleBroadcast(bcast);
events = deviceStore.updatePorts(PID, DID1, pds3);
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
assertFalse("event should be triggered", events.isEmpty());
for (DeviceEvent event : events) {
PortNumber num = event.port().number();
......@@ -357,7 +516,6 @@ public class GossipDeviceStoreTest {
fail("Unknown port number encountered: " + num);
}
}
}
@Test
......@@ -368,16 +526,22 @@ public class GossipDeviceStoreTest {
);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
verify(clusterCommunicator);
assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast);
assertTrue(bcast.hasCaptured());
}
@Test
public final void testUpdatePortStatusAncillary() {
public final void testUpdatePortStatusAncillary() throws IOException {
putDeviceAncillary(DID1, SW1);
putDevice(DID1, SW1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
......@@ -385,36 +549,106 @@ public class GossipDeviceStoreTest {
);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false, A1_2));
Capture<ClusterMessage> bcast = new Capture<>();
// update port from primary
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
assertFalse("Port is disabled", event.port().isEnabled());
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P1, true));
verify(clusterCommunicator);
assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast);
assertTrue(bcast.hasCaptured());
// update port from ancillary with no attributes
resetCommunicatorExpectingNoBroadcast(bcast);
final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
assertNull("Ancillary is ignored if primary exists", event2);
verify(clusterCommunicator);
assertFalse(bcast.hasCaptured());
// but, Ancillary annotation update will be notified
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P1, true, A2));
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
assertEquals(PORT_UPDATED, event3.type());
assertDevice(DID1, SW1, event3.subject());
assertEquals(P1, event3.port().number());
assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
assertFalse("Port is disabled", event3.port().isEnabled());
verify(clusterCommunicator);
assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast);
assertTrue(bcast.hasCaptured());
// port only reported from Ancillary will be notified as down
DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P2, true));
resetCommunicatorExpectingSingleBroadcast(bcast);
final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
assertEquals(PORT_ADDED, event4.type());
assertDevice(DID1, SW1, event4.subject());
assertEquals(P2, event4.port().number());
assertAnnotationsEquals(event4.port().annotations());
assertFalse("Port is disabled if not given from primary provider",
event4.port().isEnabled());
verify(clusterCommunicator);
// TODO: verify broadcast message content
assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast);
assertTrue(bcast.hasCaptured());
}
private void assertInternalPortStatusEvent(NodeId sender, DeviceId did,
ProviderId pid, DefaultPortDescription expectedDesc,
List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) {
assertTrue(actualMsg.hasCaptured());
assertEquals(sender, actualMsg.getValue().sender());
assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
actualMsg.getValue().subject());
InternalPortStatusEvent addEvent
= testGossipDeviceStore.deserialize(actualMsg.getValue().payload());
assertEquals(did, addEvent.deviceId());
assertEquals(pid, addEvent.providerId());
assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
addEvent.portDescription().value());
}
private void assertPortDescriptionEquals(
PortDescription expectedDesc,
List<SparseAnnotations> expectedAnnotations,
PortDescription actual) {
assertEquals(expectedDesc.portNumber(), actual.portNumber());
assertEquals(expectedDesc.isEnabled(), actual.isEnabled());
assertAnnotationsEquals(actual.annotations(),
expectedAnnotations.toArray(new SparseAnnotations[0]));
}
private void resetCommunicatorExpectingNoBroadcast(
Capture<ClusterMessage> bcast) {
bcast.reset();
reset(clusterCommunicator);
replay(clusterCommunicator);
}
private void resetCommunicatorExpectingSingleBroadcast(
Capture<ClusterMessage> bcast) {
bcast.reset();
reset(clusterCommunicator);
try {
expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
} catch (IOException e) {
fail("Should never reach here");
}
replay(clusterCommunicator);
}
@Test
......@@ -476,12 +710,19 @@ public class GossipDeviceStoreTest {
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
Capture<ClusterMessage> bcast = new Capture<>();
resetCommunicatorExpectingSingleBroadcast(bcast);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(1, deviceStore.getDeviceCount());
assertEquals(0, deviceStore.getPorts(DID1).size());
verify(clusterCommunicator);
// TODO: verify broadcast message
assertTrue(bcast.hasCaptured());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
......@@ -563,34 +804,28 @@ public class GossipDeviceStoreTest {
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
}
}
private static final class TestClusterCommunicationService implements ClusterCommunicationService {
@Override
public boolean broadcast(ClusterMessage message) throws IOException { return true; }
@Override
public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
@Override
public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
public <T> T deserialize(byte[] bytes) {
return SERIALIZER.decode(bytes);
}
}
private static final class TestClusterService implements ClusterService {
private static final ControllerNode ONOS1 =
new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
public TestClusterService() {
nodes.put(new NodeId("N1"), ONOS1);
nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
nodes.put(NID1, ONOS1);
nodeStates.put(NID1, ACTIVE);
nodes.put(NID2, ONOS2);
nodeStates.put(NID2, ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
return ONOS1;
return GossipDeviceStoreTest.ONOS1;
}
@Override
......
package org.onlab.onos.store.serializers;
import org.onlab.util.KryoPool.FamilySerializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
/**
* Creates {@link ImmutableList} serializer instance.
*/
public class ImmutableListSerializer extends FamilySerializer<ImmutableList<?>> {
/**
* Creates {@link ImmutableList} serializer instance.
*/
public ImmutableListSerializer() {
// non-null, immutable
super(false, true);
}
@Override
public void write(Kryo kryo, Output output, ImmutableList<?> object) {
output.writeInt(object.size());
for (Object e : object) {
kryo.writeClassAndObject(output, e);
}
}
@Override
public ImmutableList<?> read(Kryo kryo, Input input,
Class<ImmutableList<?>> type) {
final int size = input.readInt();
Builder<Object> builder = ImmutableList.builder();
for (int i = 0; i < size; ++i) {
builder.add(kryo.readClassAndObject(input));
}
return builder.build();
}
@Override
public void registerFamilies(Kryo kryo) {
kryo.register(ImmutableList.of(1).getClass(), this);
kryo.register(ImmutableList.of(1, 2).getClass(), this);
// TODO register required ImmutableList variants
}
}
......@@ -31,6 +31,9 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
public final class KryoPoolUtil {
/**
......@@ -47,12 +50,15 @@ public final class KryoPoolUtil {
*/
public static final KryoPool API = KryoPool.newBuilder()
.register(MISC)
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(
//
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
//
//
ControllerNode.State.class,
Device.Type.class,
DefaultAnnotations.class,
......