tom

Working on the cluster i/o

Showing 19 changed files with 923 additions and 400 deletions
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AcceptorLoop;
import org.onlab.packet.IpPrefix;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import static java.net.InetAddress.getByAddress;
/**
* Listens to inbound connection requests and accepts them.
*/
public class ClusterConnectionListener extends AcceptorLoop {
private static final long SELECT_TIMEOUT = 50;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final WorkerFinder workerFinder;
ClusterConnectionListener(IpPrefix ip, int tcpPort,
WorkerFinder workerFinder) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.workerFinder = workerFinder;
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
workerFinder.findWorker().acceptStream(sc);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Objects;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Performs the IO operations related to a cluster-wide communications.
*/
public class ClusterIOWorker extends
IOLoop<ClusterMessage, ClusterMessageStream> {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long SELECT_TIMEOUT = 50;
private final ConnectionManager connectionManager;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param connectionManager parent connection manager
* @param commsDelegate communications delegate for dispatching
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ConnectionManager connectionManager,
CommunicationsDelegate commsDelegate,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.connectionManager = connectionManager;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
@Override
protected ClusterMessageStream createStream(ByteChannel byteChannel) {
return new ClusterMessageStream(serializationService, this, byteChannel);
}
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
for (ClusterMessage message : messages) {
commsDelegate.dispatch(message);
}
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(helloMessage);
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
stream.write(helloMessage);
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
}
}
@Override
protected void removeStream(MessageStream<ClusterMessage> stream) {
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
connectionManager.removeNodeStream(node);
}
super.removeStream(stream);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
*/
public interface ClusterNodesDelegate {
/**
* Notifies about a new cluster node being detected.
*
* @param node newly detected cluster node
*/
void nodeDetected(DefaultControllerNode node);
/**
* Notifies about cluster node going offline.
*
* @param node cluster node that vanished
*/
void nodeVanished(DefaultControllerNode node);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Simple back interface for interacting with the communications service.
*/
public interface CommunicationsDelegate {
/**
* Dispatches the specified message to all registered subscribers.
*
* @param message message to be dispatched
*/
void dispatch(ClusterMessage message);
/**
* Sets the sender.
*
* @param messageSender message sender
*/
void setSender(MessageSender messageSender);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Manages connections to other controller cluster nodes.
*/
public class ConnectionManager implements MessageSender {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private final DefaultControllerNode localNode;
private final ClusterNodesDelegate nodesDelegate;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
/**
* Creates a new connection manager.
*/
ConnectionManager(DefaultControllerNode localNode,
ClusterNodesDelegate nodesDelegate,
CommunicationsDelegate commsDelegate,
SerializationService serializationService) {
this.localNode = localNode;
this.nodesDelegate = nodesDelegate;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
commsDelegate.setSender(this);
startCommunications();
startListening();
startInitiating();
log.info("Started");
}
/**
* Shuts down the connection manager.
*/
void shutdown() {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
log.info("Stopped");
}
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node) {
nodes.add(node);
}
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node) {
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node);
streams.remove(node.id());
}
@Override
public boolean send(NodeId nodeId, ClusterMessage message) {
ClusterMessageStream stream = streams.get(nodeId);
if (stream != null) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send a message about {} to node {}",
message.subject(), nodeId);
}
}
return false;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, commsDelegate,
serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
workerFinder);
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (node != localNode && !streams.containsKey(node.id())) {
try {
initiateConnection(node, workerFinder.findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
// Finds the least utilitied IO loop.
private class LeastUtilitiedWorkerFinder implements WorkerFinder {
@Override
public ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
}
}
......@@ -4,10 +4,9 @@ import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
......@@ -15,33 +14,18 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
import static org.onlab.util.Tools.namedThreads;
/**
* Distributed implementation of the cluster nodes store.
......@@ -52,146 +36,69 @@ public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private static final int HELLO_MSG = 1;
private static final int ECHO_MSG = 2;
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final long SELECT_TIMEOUT = 50;
private static final int WORKERS = 3;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private DefaultControllerNode self;
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
// Means to track message streams to other nodes.
private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CommunicationsDelegate commsDelegate;
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private SerializationService serializationService;
private ListenLoop listenLoop;
private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
startCommunications();
startListening();
startInitiating();
establishSelfIdentity();
connectionManager = new ConnectionManager(localNode, nodesDelegate,
commsDelegate, serializationService);
log.info("Started");
}
@Deactivate
public void deactivate() {
listenLoop.shutdown();
for (CommLoop loop : commLoops) {
loop.shutdown();
}
log.info("Stopped");
}
// Loads the cluster definition file
/**
* Loads the cluster definition file.
*/
private void loadClusterDefinition() {
// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
// try {
// Set<DefaultControllerNode> storedNodes = cds.read();
// for (DefaultControllerNode node : storedNodes) {
// nodes.put(node.id(), node);
// }
// } catch (IOException e) {
// log.error("Unable to read cluster definitions", e);
// }
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
self = nodes.get(new NodeId(ip.toString()));
// As a fall-back, let's make sure we at least know who we are.
if (self == null) {
self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(self.id(), self);
states.put(self.id(), State.ACTIVE);
}
}
// Kicks off the IO loops.
private void startCommunications() {
for (int i = 0; i < WORKERS; i++) {
try {
CommLoop loop = new CommLoop();
commLoops.add(loop);
commExecutors.execute(loop);
} catch (IOException e) {
log.warn("Unable to start comm IO loop", e);
}
}
// Wait for the IO loops to start
for (CommLoop loop : commLoops) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
// Starts listening for connections from peer cluster members.
private void startListening() {
ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
try {
listenLoop = new ListenLoop(self.ip(), self.tcpPort());
listenExecutor.execute(listenLoop);
if (!listenLoop.awaitStart(START_TIMEOUT)) {
log.warn("Listen loop did not start on-time; moving on...");
Set<DefaultControllerNode> storedNodes = cds.read();
for (DefaultControllerNode node : storedNodes) {
nodes.put(node.id(), node);
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
log.error("Unable to read cluster definitions", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
* Determines who the local controller node is.
*/
private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
nodesByChannel.put(ch, node);
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
private void establishSelfIdentity() {
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
localNode = nodes.get(new NodeId(ip.toString()));
// Attempts to connect to any nodes that do not have an associated connection.
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
// As a fall-back, let's make sure we at least know who we are.
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
states.put(localNode.id(), State.ACTIVE);
}
}
@Override
public ControllerNode getLocalNode() {
return self;
return localNode;
}
@Override
......@@ -215,179 +122,29 @@ public class DistributedClusterStore
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
connectionManager.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
nodes.remove(nodeId);
TLVMessageStream stream = streams.remove(nodeId);
if (stream != null) {
stream.close();
}
}
// Listens and accepts inbound connections from other cluster nodes.
private class ListenLoop extends AcceptorLoop {
ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
findLeastUtilizedLoop().acceptStream(sc);
}
}
private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
CommLoop() throws IOException {
super(SELECT_TIMEOUT);
}
@Override
protected TLVMessageStream createStream(ByteChannel byteChannel) {
return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
}
@Override
protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
TLVMessageStream tlvStream = (TLVMessageStream) stream;
for (TLVMessage message : messages) {
// TODO: add type-based dispatching here... this is just a hack to get going
if (message.type() == HELLO_MSG) {
processHello(message, tlvStream);
} else if (message.type() == ECHO_MSG) {
processEcho(message, tlvStream);
} else {
log.info("Deal with other messages");
}
}
}
@Override
public TLVMessageStream acceptStream(SocketChannel channel) {
TLVMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(createHello(self));
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
public TLVMessageStream connectStream(SocketChannel channel) {
TLVMessageStream stream = super.connectStream(channel);
DefaultControllerNode node = nodesByChannel.get(channel);
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
log.debug("Opened connection to node {}", node.id());
nodesByChannel.remove(channel);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
TLVMessageStream stream = (TLVMessageStream) key.attachment();
send(stream, createHello(self));
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
connectionManager.removeNode(node);
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
protected void removeStream(MessageStream<TLVMessage> stream) {
DefaultControllerNode node = ((TLVMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
states.put(node.id(), State.INACTIVE);
streams.remove(node.id());
}
super.removeStream(stream);
}
}
// Processes a HELLO message from a peer controller node.
private void processHello(TLVMessage message, TLVMessageStream stream) {
// FIXME: pure hack for now
String data = new String(message.data());
String[] fields = data.split(":");
DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
valueOf(fields[1]),
Integer.parseInt(fields[2]));
stream.setNode(node);
public void nodeDetected(DefaultControllerNode node) {
nodes.put(node.id(), node);
streams.put(node.id(), stream);
states.put(node.id(), State.ACTIVE);
}
// Processes an ECHO message from a peer controller node.
private void processEcho(TLVMessage message, TLVMessageStream tlvStream) {
// TODO: implement heart-beat refresh
log.info("Dealing with echoes...");
}
// Sends message to the specified stream.
private void send(TLVMessageStream stream, TLVMessage message) {
try {
stream.write(message);
} catch (IOException e) {
log.warn("Unable to send message to {}", stream.node().id());
}
}
// Creates a hello message to be sent to a peer controller node.
private TLVMessage createHello(DefaultControllerNode self) {
return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes.values()) {
if (node != self && !streams.containsKey(node.id())) {
try {
openConnection(node, findLeastUtilizedLoop());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
// Finds the least utilities IO loop.
private CommLoop findLeastUtilizedLoop() {
CommLoop leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (CommLoop loop : commLoops) {
int count = loop.streamCount();
if (count == 0) {
return loop;
}
if (count < minCount) {
leastUtilized = loop;
minCount = count;
}
public void nodeVanished(DefaultControllerNode node) {
states.put(node.id(), State.INACTIVE);
}
return leastUtilized;
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Created by tom on 9/29/14.
*/
public interface MessageSender {
/**
* Sends the specified message to the given cluster node.
*
* @param nodeId node identifier
* @param message mesage to send
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(NodeId nodeId, ClusterMessage message);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AbstractMessage;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications using TLVs.
*/
public class TLVMessage extends AbstractMessage {
private final int type;
private final byte[] data;
/**
* Creates an immutable TLV message.
*
* @param type message type
* @param data message data bytes
*/
public TLVMessage(int type, byte[] data) {
this.length = data.length + TLVMessageStream.METADATA_LENGTH;
this.type = type;
this.data = data;
}
/**
* Returns the message type indicator.
*
* @return message type
*/
public int type() {
return type;
}
/**
* Returns the data bytes.
*
* @return message data
*/
public byte[] data() {
return data;
}
@Override
public int hashCode() {
return Objects.hash(type, data);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final TLVMessage other = (TLVMessage) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.data, other.data);
}
@Override
public String toString() {
return toStringHelper(this).add("type", type).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.impl;
/**
* Provides means to find a worker IO loop.
*/
public interface WorkerFinder {
/**
* Finds a suitable worker.
*
* @return available worker
*/
ClusterIOWorker findWorker();
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import java.util.Set;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(ClusterMessage message, NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Removes the specified subscriber from the given message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Returns the set of subscribers for the specified message subject.
*
* @param subject message subject
* @return set of message subscribers
*/
Set<MessageSubscriber> getSubscribers(MessageSubject subject);
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.AbstractMessage;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications.
*/
public abstract class ClusterMessage extends AbstractMessage {
private final MessageSubject subject;
/**
* Creates a cluster message.
*
* @param subject message subject
*/
protected ClusterMessage(MessageSubject subject) {
this.subject = subject;
}
/**
* Returns the message subject indicator.
*
* @return message subject
*/
public MessageSubject subject() {
return subject;
}
@Override
public String toString() {
return toStringHelper(this).add("subject", subject).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
......@@ -10,29 +10,29 @@ import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring TLV messages between cluster members.
* Stream for transferring messages between two cluster members.
*/
public class TLVMessageStream extends MessageStream<TLVMessage> {
public class ClusterMessageStream extends MessageStream<ClusterMessage> {
public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafecafefeedL;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private DefaultControllerNode node;
private SerializationService serializationService;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param serializationService service for encoding/decoding messages
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
*/
protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
public ClusterMessageStream(SerializationService serializationService,
IOLoop<ClusterMessage, ?> loop,
ByteChannel byteChannel) {
super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
this.serializationService = serializationService;
}
/**
......@@ -40,7 +40,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
*
* @return controller node
*/
DefaultControllerNode node() {
public DefaultControllerNode node() {
return node;
}
......@@ -49,47 +49,19 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
*
* @param node controller node
*/
void setNode(DefaultControllerNode node) {
public void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected TLVMessage read(ByteBuffer buffer) {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int type = buffer.getInt();
length = buffer.getInt();
// TODO: add deserialization hook here
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return new TLVMessage(type, data);
protected ClusterMessage read(ByteBuffer buffer) {
return serializationService.decode(buffer);
}
@Override
protected void write(TLVMessage message, ByteBuffer buffer) {
buffer.putLong(MARKER);
buffer.putInt(message.type());
buffer.putInt(message.length());
// TODO: add serialization hook here
buffer.put(message.data());
protected void write(ClusterMessage message, ByteBuffer buffer) {
serializationService.encode(message, buffer);
}
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**l
* Echo heart-beat message that nodes send to each other.
*/
public class EchoMessage extends ClusterMessage {
private NodeId nodeId;
// For serialization
private EchoMessage() {
super(MessageSubject.HELLO);
nodeId = null;
}
/**
* Creates a new heart-beat echo message.
*
* @param nodeId sending node identification
*/
public EchoMessage(NodeId nodeId) {
super(MessageSubject.HELLO);
nodeId = nodeId;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
private HelloMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new hello message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
nodeId = nodeId;
ipAddress = ipAddress;
tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
package org.onlab.onos.store.cluster.messaging;
/**
* Representation of a message subject.
*/
public enum MessageSubject {
/** Represents a first greeting message. */
HELLO,
/** Signifies a heart-beat message. */
ECHO
}
package org.onlab.onos.store.cluster.messaging;
/**
* Represents a message consumer.
*/
public interface MessageSubscriber {
/**
* Receives the specified cluster message.
*
* @param message message to be received
*/
void receive(ClusterMessage message);
}
package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for serializing/deserializing intra-cluster messages.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain a message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
*/
ClusterMessage decode(ByteBuffer buffer);
/**
* Encodes the specified message into the given byte buffer.
*
* @param message message to be encoded
* @param buffer byte buffer to receive the message data
*/
void encode(ClusterMessage message, ByteBuffer buffer);
}
package org.onlab.onos.store.cluster.messaging.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
import org.onlab.onos.store.cluster.impl.MessageSender;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import java.util.Set;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, CommunicationsDelegate {
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
private MessageSender messageSender;
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
return messageSender.send(toNodeId, message);
}
@Override
public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void dispatch(ClusterMessage message) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message);
}
}
}
@Override
public void setSender(MessageSender messageSender) {
this.messageSender = messageSender;
}
}
package org.onlab.onos.store.cluster.messaging.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
public class MessageSerializer implements SerializationService {
private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafebeaddeadL;
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int subjectOrdinal = buffer.getInt();
MessageSubject subject = MessageSubject.values()[subjectOrdinal];
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
// TODO: add deserialization hook here; for now this hack
return null; // actually deserialize
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
return null;
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
int i = 0;
// Type based lookup for proper encoder
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
}
}