alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Conflicts:
	core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/package-info.java

Change-Id: I7bf076fae02c619ff0d57ffcbff4a4189716c474
Showing 50 changed files with 1113 additions and 211 deletions
......@@ -28,10 +28,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.livetribe.slp</groupId>
<artifactId>livetribe-slp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
......
......@@ -233,7 +233,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......
livetribe.slp.da.expired.services.purge.period=60
livetribe.slp.sa.client.connect.address=127.0.0.1
livetribe.slp.sa.client.factory=org.livetribe.slp.sa.StandardServiceAgentClient$Factory
livetribe.slp.sa.factory=org.livetribe.slp.sa.StandardServiceAgent$Factory
livetribe.slp.sa.service.renewal.enabled=true
livetribe.slp.sa.unicast.prefer.tcp=false
livetribe.slp.tcp.connector.factory=org.livetribe.slp.spi.net.SocketTCPConnector$Factory
livetribe.slp.tcp.connector.server.factory=org.livetribe.slp.spi.net.SocketTCPConnectorServer$Factory
livetribe.slp.tcp.message.max.length=4096
livetribe.slp.tcp.read.timeout=300000
livetribe.slp.ua.client.factory=org.livetribe.slp.ua.StandardUserAgentClient$Factory
livetribe.slp.ua.factory=org.livetribe.slp.ua.StandardUserAgent$Factory
livetribe.slp.ua.unicast.prefer.tcp=false
livetribe.slp.udp.connector.factory=org.livetribe.slp.spi.net.SocketUDPConnector$Factory
livetribe.slp.udp.connector.server.factory=org.livetribe.slp.spi.net.SocketUDPConnectorServer$Factory
net.slp.DAAddresses=
net.slp.DAAttributes=
net.slp.DAHeartBeat=10800
net.slp.MTU=1400
net.slp.SAAttributes=
net.slp.broadcastAddress=255.255.255.255
net.slp.datagramTimeouts=150,250,400
net.slp.interfaces=0.0.0.0
net.slp.isBroadcastOnly=false
net.slp.locale=en
net.slp.multicastAddress=239.255.255.253
net.slp.multicastMaximumWait=15000
net.slp.multicastTTL=255
net.slp.multicastTimeouts=150,250,400,600,1000
net.slp.notificationPort=1847
net.slp.port=427
net.slp.useScopes=default
org.onlab.cluster.name = TV-ONOS
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterAdminService;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Adds a new controller cluster node.
*/
@Command(scope = "onos", name = "add-node",
description = "Adds a new controller cluster node")
public class NodeAddCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
@Argument(index = 1, name = "ip", description = "Node IP address",
required = true, multiValued = false)
String ip = null;
@Argument(index = 2, name = "tcpPort", description = "Node TCP listen port",
required = false, multiValued = false)
int tcpPort = 9876;
@Override
protected void execute() {
ClusterAdminService service = get(ClusterAdminService.class);
service.addNode(new NodeId(nodeId), IpPrefix.valueOf(ip), tcpPort);
}
}
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterAdminService;
import org.onlab.onos.cluster.NodeId;
/**
* Removes a controller cluster node.
*/
@Command(scope = "onos", name = "remove-node",
description = "Removes a new controller cluster node")
public class NodeRemoveCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
@Override
protected void execute() {
ClusterAdminService service = get(ClusterAdminService.class);
service.removeNode(new NodeId(nodeId));
}
}
......@@ -17,7 +17,7 @@ import static com.google.common.collect.Lists.newArrayList;
public class NodesListCommand extends AbstractShellCommand {
private static final String FMT =
"id=%s, ip=%s, state=%s %s";
"id=%s, address=%s:%s, state=%s %s";
@Override
protected void execute() {
......@@ -26,7 +26,7 @@ public class NodesListCommand extends AbstractShellCommand {
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
ControllerNode self = service.getLocalNode();
for (ControllerNode node : nodes) {
print(FMT, node.id(), node.ip(),
print(FMT, node.id(), node.ip(), node.tcpPort(),
service.getState(node.id()),
node.equals(self) ? "*" : "");
}
......
......@@ -5,6 +5,12 @@
<action class="org.onlab.onos.cli.NodesListCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.NodeAddCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.NodeRemoveCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.MastersListCommand"/>
<completers>
<ref component-id="clusterIdCompleter"/>
......
package org.onlab.onos.cluster;
import org.onlab.packet.IpPrefix;
/**
* Service for administering the cluster node membership.
*/
public interface ClusterAdminService {
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Removes the specified node from the cluster node list.
*
* @param nodeId controller node identifier
......
package org.onlab.onos.cluster;
import org.onlab.onos.store.Store;
import org.onlab.packet.IpPrefix;
import java.util.Set;
......@@ -40,6 +41,16 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
ControllerNode.State getState(NodeId nodeId);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Removes the specified node from the inventory of cluster nodes.
*
* @param nodeId controller instance identifier
......
......@@ -35,4 +35,12 @@ public interface ControllerNode {
*/
IpPrefix ip();
/**
* Returns the TCP port on which the node listens for connections.
*
* @return TCP port
*/
int tcpPort();
}
......
......@@ -11,13 +11,17 @@ import static com.google.common.base.MoreObjects.toStringHelper;
*/
public class DefaultControllerNode implements ControllerNode {
private static final int DEFAULT_PORT = 9876;
private final NodeId id;
private final IpPrefix ip;
private final int tcpPort;
// For serialization
private DefaultControllerNode() {
this.id = null;
this.ip = null;
this.tcpPort = 0;
}
/**
......@@ -27,8 +31,19 @@ public class DefaultControllerNode implements ControllerNode {
* @param ip instance IP address
*/
public DefaultControllerNode(NodeId id, IpPrefix ip) {
this(id, ip, DEFAULT_PORT);
}
/**
* Creates a new instance with the specified id and IP address and TCP port.
*
* @param id instance identifier
* @param ip instance IP address
*/
public DefaultControllerNode(NodeId id, IpPrefix ip, int tcpPort) {
this.id = id;
this.ip = ip;
this.tcpPort = tcpPort;
}
@Override
......@@ -42,6 +57,11 @@ public class DefaultControllerNode implements ControllerNode {
}
@Override
public int tcpPort() {
return tcpPort;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
......@@ -60,7 +80,8 @@ public class DefaultControllerNode implements ControllerNode {
@Override
public String toString() {
return toStringHelper(this).add("id", id).add("ip", ip).toString();
return toStringHelper(this).add("id", id)
.add("ip", ip).add("tcpPort", tcpPort).toString();
}
}
......
......@@ -16,10 +16,12 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -81,6 +83,14 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
checkArgument(tcpPort > 5000, "TCP port must be > 5000");
return store.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeNode(NodeId nodeId) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
store.removeNode(nodeId);
......
......@@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
......@@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest {
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
private KryoSerializationManager serializationMgr;
@Before
public void setUp() {
......@@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
dstore = new TestDistributedDeviceStore();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
dstore.activate();
mgr.store = dstore;
......@@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest {
mgr.deactivate();
dstore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -163,7 +171,7 @@ public class DistributedDeviceManagerTest {
public void deviceDisconnected() {
connectDevice(DID1, SW1);
connectDevice(DID2, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
assertTrue("device should be available", service.isAvailable(DID1));
// Disconnect
......@@ -182,10 +190,10 @@ public class DistributedDeviceManagerTest {
@Test
public void deviceUpdated() {
connectDevice(DID1, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED);
connectDevice(DID1, SW2);
validateEvents(DEVICE_UPDATED, DEVICE_UPDATED);
validateEvents(DEVICE_UPDATED);
}
@Test
......@@ -202,7 +210,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P2, true));
pds.add(new DefaultPortDescription(P3, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
pds.clear();
pds.add(new DefaultPortDescription(P1, false));
......@@ -218,7 +226,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
validateEvents(PORT_UPDATED);
......@@ -233,7 +241,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
assertEquals("wrong port count", 2, service.getPorts(DID1).size());
Port port = service.getPort(DID1, P1);
......@@ -247,7 +255,7 @@ public class DistributedDeviceManagerTest {
connectDevice(DID2, SW2);
assertEquals("incorrect device count", 2, service.getDeviceCount());
admin.removeDevice(DID1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED, DEVICE_REMOVED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED);
assertNull("device should not be found", service.getDevice(DID1));
assertNotNull("device should be found", service.getDevice(DID2));
assertEquals("incorrect device count", 1, service.getDeviceCount());
......@@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest {
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore() {
this.storeService = storeManager;
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
......
......@@ -26,6 +26,23 @@
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
......
package org.onlab.onos.store.cluster.impl;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
public class ClusterDefinitionStore {
private final File file;
/**
* Creates a reader/writer of the cluster definition file.
*
* @param filePath location of the definition file
*/
public ClusterDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Returns set of the controller nodes, including self.
*
* @return set of controller nodes
*/
public Set<DefaultControllerNode> read() throws IOException {
Set<DefaultControllerNode> nodes = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
while (it.hasNext()) {
ObjectNode nodeDef = (ObjectNode) it.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpPrefix.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
return nodes;
}
/**
* Writes the given set of the controller nodes.
*
* @param nodes set of controller nodes
*/
public void write(Set<DefaultControllerNode> nodes) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = mapper.createObjectNode();
ArrayNode nodeDefs = mapper.createArrayNode();
clusterNodeDef.set("nodes", nodeDefs);
for (DefaultControllerNode node : nodes) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
clusterNodeDef);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AbstractMessage;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications using TLVs.
*/
public class TLVMessage extends AbstractMessage {
private final int type;
private final byte[] data;
/**
* Creates an immutable TLV message.
*
* @param type message type
* @param data message data bytes
*/
public TLVMessage(int type, byte[] data) {
this.length = data.length + TLVMessageStream.METADATA_LENGTH;
this.type = type;
this.data = data;
}
/**
* Returns the message type indicator.
*
* @return message type
*/
public int type() {
return type;
}
/**
* Returns the data bytes.
*
* @return message data
*/
public byte[] data() {
return data;
}
@Override
public int hashCode() {
return Objects.hash(type, data);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final TLVMessage other = (TLVMessage) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.data, other.data);
}
@Override
public String toString() {
return toStringHelper(this).add("type", type).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring TLV messages between cluster members.
*/
public class TLVMessageStream extends MessageStream<TLVMessage> {
public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafecafefeedL;
private DefaultControllerNode node;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
*/
protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
}
/**
* Returns the node with which this stream is associated.
*
* @return controller node
*/
DefaultControllerNode node() {
return node;
}
/**
* Sets the node with which this stream is affiliated.
*
* @param node controller node
*/
void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected TLVMessage read(ByteBuffer buffer) {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int type = buffer.getInt();
length = buffer.getInt();
// TODO: add deserialization hook here
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return new TLVMessage(type, data);
}
@Override
protected void write(TLVMessage message, ByteBuffer buffer) {
buffer.putLong(MARKER);
buffer.putInt(message.type());
buffer.putInt(message.length());
// TODO: add serialization hook here
buffer.put(message.data());
}
}
......@@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore
@Override
public Iterable<Device> getDevices() {
// TODO builder v.s. copyOf. Guava semms to be using copyOf?
// FIXME: synchronize.
Builder<Device> builder = ImmutableSet.builder();
for (VersionedValue<? extends Device> device : devices.values()) {
synchronized (this) {
for (VersionedValue<Device> device : devices.values()) {
builder.add(device.entity());
}
return builder.build();
}
}
@Override
public Device getDevice(DeviceId deviceId) {
return devices.get(deviceId).entity();
VersionedValue<Device> device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
return device.entity();
}
@Override
public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription) {
Timestamp now = clockService.getTimestamp(deviceId);
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
VersionedValue<Device> device = devices.get(deviceId);
if (device == null) {
return createDevice(providerId, deviceId, deviceDescription, now);
return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
}
checkState(now.compareTo(device.timestamp()) > 0,
checkState(newTimestamp.compareTo(device.timestamp()) > 0,
"Existing device has a timestamp in the future!");
return updateDevice(providerId, device.entity(), deviceDescription, now);
return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
}
// Creates the device and returns the appropriate event if necessary.
private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription desc, Timestamp timestamp) {
DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(),
Device device = new DefaultDevice(providerId, deviceId, desc.type(),
desc.manufacturer(),
desc.hwVersion(), desc.swVersion(),
desc.serialNumber());
devices.put(deviceId, new VersionedValue<Device>(device, true, timestamp));
// FIXME: broadcast a message telling peers of a device event.
devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
// TODO,FIXME: broadcast a message telling peers of a device event.
return new DeviceEvent(DEVICE_ADDED, device, null);
}
......@@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore
}
// Otherwise merely attempt to change availability
DefaultDevice updated = new DefaultDevice(providerId, device.id(),
Device updated = new DefaultDevice(providerId, device.id(),
desc.type(),
desc.manufacturer(),
desc.hwVersion(),
......@@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore
VersionedValue<Device> device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
Timestamp timestamp = clockService.getTimestamp(deviceId);
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
VersionedValue<Port> port = ports.get(portDescription.portNumber());
if (port == null) {
events.add(createPort(device, portDescription, ports, timestamp));
events.add(createPort(device, portDescription, ports, newTimestamp));
}
checkState(timestamp.compareTo(port.timestamp()) > 0,
checkState(newTimestamp.compareTo(port.timestamp()) > 0,
"Existing port state has a timestamp in the future!");
events.add(updatePort(device, port, portDescription, ports, timestamp));
events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
processed.add(portDescription.portNumber());
}
......@@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore
// Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
//@GuardedBy("this")
private DeviceEvent updatePort(VersionedValue<Device> device, VersionedValue<Port> port,
private DeviceEvent updatePort(Device device, Port port,
PortDescription portDescription,
Map<PortNumber, VersionedValue<Port>> ports,
Timestamp timestamp) {
if (port.entity().isEnabled() != portDescription.isEnabled()) {
if (port.isEnabled() != portDescription.isEnabled()) {
VersionedValue<Port> updatedPort = new VersionedValue<Port>(
new DefaultPort(device.entity(), portDescription.portNumber(),
new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled()),
portDescription.isEnabled(),
timestamp);
ports.put(port.entity().number(), updatedPort);
updatePortMap(device.entity().id(), ports);
return new DeviceEvent(PORT_UPDATED, device.entity(), updatedPort.entity());
ports.put(port.number(), updatedPort);
updatePortMap(device.id(), ports);
return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
}
return null;
}
......@@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore
Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
VersionedValue<Port> port = ports.get(portDescription.portNumber());
Timestamp timestamp = clockService.getTimestamp(deviceId);
return updatePort(device, port, portDescription, ports, timestamp);
return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
}
@Override
......
package org.onlab.onos.store.link.impl;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.device.impl.VersionedValue;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.ImmutableSet.Builder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
* Manages inventory of infrastructure links using a protocol that takes into consideration
* the order in which events occur.
*/
// FIXME: This does not yet implement the full protocol.
// The full protocol requires the sender of LLDP message to include the
// version information of src device/port and the receiver to
// take that into account when figuring out if a more recent src
// device/port down event renders the link discovery obsolete.
@Component(immediate = true)
@Service
public class OnosDistributedLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
// Link inventory
private ConcurrentMap<LinkKey, VersionedValue<Link>> links;
public static final String LINK_NOT_FOUND = "Link between %s and %s not found";
// TODO synchronize?
// Egress and ingress link sets
private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Activate
public void activate() {
links = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
Builder<Link> builder = ImmutableSet.builder();
synchronized (this) {
for (VersionedValue<Link> link : links.values()) {
builder.add(link.entity());
}
return builder.build();
}
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId));
Set<Link> rawEgressLinks = new HashSet<>();
for (VersionedValue<Link> link : egressLinks) {
rawEgressLinks.add(link.entity());
}
return rawEgressLinks;
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId));
Set<Link> rawIngressLinks = new HashSet<>();
for (VersionedValue<Link> link : ingressLinks) {
rawIngressLinks.add(link.entity());
}
return rawIngressLinks;
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
VersionedValue<Link> link = links.get(new LinkKey(src, dst));
checkArgument(link != null, "LINK_NOT_FOUND", src, dst);
return link.entity();
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egressLinks = new HashSet<>();
for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) {
if (link.entity().src().equals(src)) {
egressLinks.add(link.entity());
}
}
return egressLinks;
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingressLinks = new HashSet<>();
for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) {
if (link.entity().dst().equals(dst)) {
ingressLinks.add(link.entity());
}
}
return ingressLinks;
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
final DeviceId destinationDeviceId = linkDescription.dst().deviceId();
final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId);
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
VersionedValue<Link> link = links.get(key);
if (link == null) {
return createLink(providerId, key, linkDescription, newTimestamp);
}
checkState(newTimestamp.compareTo(link.timestamp()) > 0,
"Existing Link has a timestamp in the future!");
return updateLink(providerId, link, key, linkDescription, newTimestamp);
}
// Creates and stores the link and returns the appropriate event.
private LinkEvent createLink(ProviderId providerId, LinkKey key,
LinkDescription linkDescription, Timestamp timestamp) {
VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(),
linkDescription.type()), true, timestamp);
synchronized (this) {
links.put(key, link);
addNewLink(link, timestamp);
}
// FIXME: notify peers.
return new LinkEvent(LINK_ADDED, link.entity());
}
// update Egress and ingress link sets
private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) {
Link rawLink = link.entity();
synchronized (this) {
srcLinks.put(rawLink.src().deviceId(), link);
dstLinks.put(rawLink.dst().deviceId(), link);
}
}
// Updates, if necessary the specified link and returns the appropriate event.
private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink,
LinkKey key, LinkDescription linkDescription, Timestamp timestamp) {
// FIXME confirm Link update condition is OK
if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) {
synchronized (this) {
VersionedValue<Link> updatedLink = new VersionedValue<Link>(
new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(),
linkDescription.type()), true, timestamp);
links.replace(key, existingLink, updatedLink);
replaceLink(existingLink, updatedLink);
// FIXME: notify peers.
return new LinkEvent(LINK_UPDATED, updatedLink.entity());
}
}
return null;
}
// update Egress and ingress link sets
private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) {
synchronized (this) {
srcLinks.remove(current.entity().src().deviceId(), current);
dstLinks.remove(current.entity().dst().deviceId(), current);
srcLinks.put(current.entity().src().deviceId(), updated);
dstLinks.put(current.entity().dst().deviceId(), updated);
}
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
synchronized (this) {
LinkKey key = new LinkKey(src, dst);
VersionedValue<Link> link = links.remove(key);
if (link != null) {
removeLink(link);
// notify peers
return new LinkEvent(LINK_REMOVED, link.entity());
}
return null;
}
}
// update Egress and ingress link sets
private void removeLink(VersionedValue<Link> link) {
synchronized (this) {
srcLinks.remove(link.entity().src().deviceId(), link);
dstLinks.remove(link.entity().dst().deviceId(), link);
}
}
}
......@@ -49,6 +49,7 @@ public class DistributedClusterStore
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Override
@Activate
public void activate() {
super.activate();
......@@ -56,9 +57,9 @@ public class DistributedClusterStore
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(storeService, rawNodes);
= new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
loadClusterNodes();
......@@ -68,7 +69,7 @@ public class DistributedClusterStore
// Loads the initial set of cluster nodes
private void loadClusterNodes() {
for (Member member : theInstance.getCluster().getMembers()) {
addMember(member);
addNode(node(member));
}
}
......@@ -104,6 +105,11 @@ public class DistributedClusterStore
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
@Override
public void removeNode(NodeId nodeId) {
synchronized (this) {
rawNodes.remove(serialize(nodeId));
......@@ -112,8 +118,7 @@ public class DistributedClusterStore
}
// Adds a new node based on the specified member
private synchronized ControllerNode addMember(Member member) {
DefaultControllerNode node = node(member);
private synchronized ControllerNode addNode(DefaultControllerNode node) {
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
states.put(node.id(), State.ACTIVE);
......@@ -136,7 +141,7 @@ public class DistributedClusterStore
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("Member {} added", membershipEvent.getMember());
ControllerNode node = addMember(membershipEvent.getMember());
ControllerNode node = addNode(node(membershipEvent.getMember()));
notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
}
......
......@@ -52,7 +52,7 @@ implements MastershipStore {
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(storeService, rawMasters);
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
......@@ -123,7 +123,7 @@ implements MastershipStore {
return null;
}
private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
}
......
......@@ -6,6 +6,7 @@ import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -14,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -31,6 +33,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KryoSerializationService kryoSerializationService;
protected HazelcastInstance theInstance;
@Activate
......@@ -45,7 +50,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return serialized object
*/
protected byte[] serialize(Object obj) {
return storeService.serialize(obj);
return kryoSerializationService.serialize(obj);
}
/**
......@@ -56,7 +61,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
return storeService.deserialize(bytes);
return kryoSerializationService.deserialize(bytes);
}
......@@ -66,8 +71,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
private LoadingCache<K, Optional<V>> cache;
/**
......@@ -75,17 +81,26 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
*
* @param cache cache to update
*/
public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
this.localMember = theInstance.getCluster().getLocalMember();
this.cache = checkNotNull(cache);
}
@Override
public void mapCleared(MapEvent event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
cache.invalidateAll();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
......@@ -95,6 +110,10 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
......@@ -106,6 +125,10 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getOldValue());
cache.invalidate(key);
......@@ -141,4 +164,80 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
}
}
/**
* Distributed object remote event entry listener.
*
* @param <K> Entry key type after deserialization
* @param <V> Entry value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
public RemoteEventHandler() {
this.localMember = theInstance.getCluster().getLocalMember();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
onAdd(key, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getValue());
onRemove(key, val);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
V newVal = deserialize(event.getValue());
onUpdate(key, oldVal, newVal);
}
/**
* Remote entry addition hook.
*
* @param key new key
* @param newVal new value
*/
protected void onAdd(K key, V newVal) {
}
/**
* Remote entry update hook.
*
* @param key new key
* @param oldValue old value
* @param newVal new value
*/
protected void onUpdate(K key, V oldValue, V newVal) {
}
/**
* Remote entry remove hook.
*
* @param key new key
* @param val old value
*/
protected void onRemove(K key, V val) {
}
}
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
import com.hazelcast.core.IMap;
......@@ -16,28 +18,28 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final StoreService storeService;
private final KryoSerializationService kryoSerializationService;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param storeService to use for serialization
* @param kryoSerializationService to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) {
this.storeService = checkNotNull(storeService);
public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
this.kryoSerializationService = checkNotNull(kryoSerializationService);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = storeService.serialize(key);
byte[] keyBytes = kryoSerializationService.serialize(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = storeService.deserialize(valBytes);
V dev = kryoSerializationService.deserialize(valBytes);
return Optional.of(dev);
}
}
......
......@@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import de.javakaffee.kryoserializers.URISerializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Auxiliary bootstrap of distributed store.
......@@ -58,55 +26,18 @@ public class StoreManager implements StoreService {
private final Logger log = LoggerFactory.getLogger(getClass());
protected HazelcastInstance instance;
private KryoPool serializerPool;
@Activate
public void activate() {
try {
Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
instance = Hazelcast.newHazelcastInstance(config);
setupKryoPool();
log.info("Started");
} catch (FileNotFoundException e) {
log.error("Unable to configure Hazelcast", e);
}
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(10);
}
@Deactivate
public void deactivate() {
instance.shutdown();
......@@ -118,18 +49,4 @@ public class StoreManager implements StoreService {
return instance;
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
......
......@@ -15,22 +15,4 @@ public interface StoreService {
*/
HazelcastInstance getHazelcastInstance();
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......
......@@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager {
this.instance = instance;
}
// Hazelcast setup removed from original code.
@Override
public void activate() {
setupKryoPool();
// Hazelcast setup removed from original code.
}
}
......
......@@ -72,6 +72,10 @@ public class DistributedDeviceStore
private IMap<byte[], byte[]> rawDevicePorts;
private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts;
private String devicesListener;
private String portsListener;
@Override
@Activate
public void activate() {
......@@ -83,20 +87,20 @@ public class DistributedDeviceStore
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
= new OptionalCacheLoader<>(storeService, rawDevices);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
loadDeviceCache();
loadDevicePortsCache();
......@@ -106,6 +110,8 @@ public class DistributedDeviceStore
@Deactivate
public void deactivate() {
rawDevicePorts.removeEntryListener(portsListener);
rawDevices.removeEntryListener(devicesListener);
log.info("Stopped");
}
......@@ -354,7 +360,7 @@ public class DistributedDeviceStore
}
}
private class RemoteDeviceEventHandler extends RemoteEventHandler<DeviceId, DefaultDevice> {
private class RemoteDeviceEventHandler extends RemoteCacheEventHandler<DeviceId, DefaultDevice> {
public RemoteDeviceEventHandler(LoadingCache<DeviceId, Optional<DefaultDevice>> cache) {
super(cache);
}
......@@ -375,7 +381,7 @@ public class DistributedDeviceStore
}
}
private class RemotePortEventHandler extends RemoteEventHandler<DeviceId, Map<PortNumber, Port>> {
private class RemotePortEventHandler extends RemoteCacheEventHandler<DeviceId, Map<PortNumber, Port>> {
public RemotePortEventHandler(LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> cache) {
super(cache);
}
......
......@@ -58,6 +58,8 @@ public class DistributedLinkStore
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private String linksListener;
@Override
@Activate
public void activate() {
......@@ -68,10 +70,10 @@ public class DistributedLinkStore
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
= new OptionalCacheLoader<>(storeService, rawLinks);
= new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
loadLinkCache();
......@@ -80,7 +82,7 @@ public class DistributedLinkStore
@Deactivate
public void deactivate() {
super.activate();
rawLinks.removeEntryListener(linksListener);
log.info("Stopped");
}
......@@ -233,7 +235,7 @@ public class DistributedLinkStore
}
}
private class RemoteLinkEventHandler extends RemoteEventHandler<LinkKey, DefaultLink> {
private class RemoteLinkEventHandler extends RemoteCacheEventHandler<LinkKey, DefaultLink> {
public RemoteLinkEventHandler(LoadingCache<LinkKey, Optional<DefaultLink>> cache) {
super(cache);
}
......
......@@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
......@@ -35,12 +36,17 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast based distributed DeviceStore implementation.
*/
public class DistributedDeviceStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
......@@ -57,6 +63,7 @@ public class DistributedDeviceStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
......@@ -78,7 +85,10 @@ public class DistributedDeviceStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
deviceStore = new TestDistributedDeviceStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
deviceStore.activate();
}
......@@ -86,6 +96,8 @@ public class DistributedDeviceStoreTest {
public void tearDown() throws Exception {
deviceStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -326,6 +338,7 @@ public class DistributedDeviceStoreTest {
}
// TODO add test for Port events when we have them
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public final void testEvents() throws InterruptedException {
final CountDownLatch addLatch = new CountDownLatch(1);
......@@ -379,8 +392,10 @@ public class DistributedDeviceStoreTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService) {
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
......@@ -15,6 +15,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
......@@ -29,27 +30,28 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast based distributed LinkStore implementation.
*/
public class DistributedLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
// private static final String MFR = "whitebox";
// private static final String HW = "1.1.x";
// private static final String SW1 = "3.8.1";
// private static final String SW2 = "3.9.5";
// private static final String SN = "43311-12345";
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
......@@ -69,13 +71,17 @@ public class DistributedLinkStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
linkStore = new TestDistributedLinkStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -302,6 +308,7 @@ public class DistributedLinkStoreTest {
assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
}
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public final void testEvents() throws InterruptedException {
......@@ -354,8 +361,10 @@ public class DistributedLinkStoreTest {
class TestDistributedLinkStore extends DistributedLinkStore {
TestDistributedLinkStore(StoreService storeService) {
TestDistributedLinkStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.javakaffee.kryoserializers.URISerializer;
/**
* Serialization service using Kryo.
*/
@Component(immediate = true)
@Service
public class KryoSerializationManager implements KryoSerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(1);
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
package org.onlab.onos.store.serializers;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
*/
public interface KryoSerializationService {
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......@@ -20,7 +20,7 @@ import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure DEVICES using trivial in-memory
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
*/
@Component(immediate = true)
......@@ -68,6 +68,11 @@ public class SimpleClusterStore
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
return null;
}
@Override
public void removeNode(NodeId nodeId) {
}
......
......@@ -101,9 +101,6 @@ public class SimpleDeviceStore
synchronized (this) {
devices.put(deviceId, device);
availableDevices.add(deviceId);
// For now claim the device as a master automatically.
// roles.put(deviceId, MastershipRole.MASTER);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
}
......@@ -189,7 +186,7 @@ public class SimpleDeviceStore
new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled());
ports.put(port.number(), updatedPort);
return new DeviceEvent(PORT_UPDATED, device, port);
return new DeviceEvent(PORT_UPDATED, device, updatedPort);
}
return null;
}
......
......@@ -51,8 +51,6 @@ public class SimpleLinkStore
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private static final Set<Link> EMPTY = ImmutableSet.of();
@Activate
public void activate() {
log.info("Started");
......
......@@ -48,20 +48,17 @@
description="ONOS core components">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-dist" version="1.0.0"
description="ONOS core components">
<feature name="onos-core-hazelcast" version="1.0.0"
description="ONOS core components built on hazelcast">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-trivial" version="1.0.0"
......
......@@ -9,5 +9,5 @@
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
onos-package
for node in $nodes; do printf "%s: " $node; onos-install -f $node; done
for node in $nodes; do (printf "%s: %s\n" "$node" "`onos-install -f $node`")& done
for node in $nodes; do onos-wait-for-start $node; done
......
......@@ -15,7 +15,7 @@ env JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
pre-stop script
/opt/onos/bin/onos halt 2>/opt/onos/var/stderr.log
sleep 3
sleep 2
end script
script
......
......@@ -8,7 +8,21 @@
remote=$ONOS_USER@${1:-$OCI}
# Generate a cluster.json from the ON* environment variables
CDEF_FILE=/tmp/cluster.json
echo "{ \"nodes\":[" > $CDEF_FILE
for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE
done
echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $CDEF_FILE
echo "]}" >> $CDEF_FILE
ssh $remote "
sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
echo \"onos.ip = \$(ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
"
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/
\ No newline at end of file
......
......@@ -24,6 +24,7 @@ ssh $remote "
# Make a link to the log file directory and make a home for auxiliaries
ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log
mkdir $ONOS_INSTALL_DIR/var
mkdir $ONOS_INSTALL_DIR/config
# Install the upstart configuration file and setup options for debugging
sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf
......
# Default virtual box ONOS instances 1,2 & ONOS mininet box
. $ONOS_ROOT/tools/test/cells/.reset
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.101"
export OC2="192.168.56.102"
......
package org.onlab.util;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
......@@ -8,6 +9,8 @@ import org.apache.commons.lang3.tuple.Pair;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
......@@ -174,6 +177,22 @@ public final class KryoPool {
}
/**
* Serializes given object to byte buffer using Kryo instance in pool.
*
* @param obj Object to serialize
* @param buffer to write to
*/
public void serialize(final Object obj, final ByteBuffer buffer) {
ByteBufferOutput out = new ByteBufferOutput(buffer);
Kryo kryo = getKryo();
try {
kryo.writeClassAndObject(out, obj);
} finally {
putKryo(kryo);
}
}
/**
* Deserializes given byte array to Object using Kryo instance in pool.
*
* @param bytes serialized bytes
......@@ -192,6 +211,24 @@ public final class KryoPool {
}
}
/**
* Deserializes given byte buffer to Object using Kryo instance in pool.
*
* @param buffer input with serialized bytes
* @param <T> deserialized Object type
* @return deserialized Object
*/
public <T> T deserialize(final ByteBuffer buffer) {
ByteBufferInput in = new ByteBufferInput(buffer);
Kryo kryo = getKryo();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
putKryo(kryo);
}
}
/**
* Creates a Kryo instance with {@link #registeredTypes} pre-registered.
......
......@@ -54,6 +54,15 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
}
/**
* Returns the number of message stream in custody of the loop.
*
* @return number of message streams
*/
public int streamCount() {
return streams.size();
}
/**
* Creates a new message stream backed by the specified socket channel.
*
* @param byteChannel backing byte channel
......@@ -84,14 +93,9 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
*
* @param key selection key holding the pending connect operation.
*/
protected void connect(SelectionKey key) {
try {
protected void connect(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
ch.finishConnect();
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
if (key.isValid()) {
key.interestOps(SelectionKey.OP_READ);
}
......@@ -115,7 +119,11 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
// If there is a pending connect operation, complete it.
if (key.isConnectable()) {
try {
connect(key);
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
}
// If there is a read operation, slurp as much data as possible.
......@@ -182,9 +190,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
* with a pending accept operation.
*
* @param channel backing socket channel
* @return newly accepted message stream
*/
public void acceptStream(SocketChannel channel) {
createAndAdmit(channel, SelectionKey.OP_READ);
public S acceptStream(SocketChannel channel) {
return createAndAdmit(channel, SelectionKey.OP_READ);
}
......@@ -193,9 +202,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
* with a pending connect operation.
*
* @param channel backing socket channel
* @return newly connected message stream
*/
public void connectStream(SocketChannel channel) {
createAndAdmit(channel, SelectionKey.OP_CONNECT);
public S connectStream(SocketChannel channel) {
return createAndAdmit(channel, SelectionKey.OP_CONNECT);
}
/**
......@@ -205,12 +215,14 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
* @param channel socket channel
* @param op pending operations mask to be applied to the selection
* key as a set of initial interestedOps
* @return newly created message stream
*/
private synchronized void createAndAdmit(SocketChannel channel, int op) {
private synchronized S createAndAdmit(SocketChannel channel, int op) {
S stream = createStream(channel);
streams.add(stream);
newStreamRequests.add(new NewStreamRequest(stream, channel, op));
selector.wakeup();
return stream;
}
/**
......
......@@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -170,7 +171,7 @@ public abstract class MessageStream<M extends Message> {
}
/**
* Reads, withouth blocking, a list of messages from the stream.
* Reads, without blocking, a list of messages from the stream.
* The list will be empty if there were not messages pending.
*
* @return list of messages or null if backing channel has been closed
......@@ -262,7 +263,7 @@ public abstract class MessageStream<M extends Message> {
try {
channel.write(outbound);
} catch (IOException e) {
if (!closed && !e.getMessage().equals("Broken pipe")) {
if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
log.warn("Unable to write data", e);
ioError = e;
}
......
......@@ -230,7 +230,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......