Madan Jampani

Added Netty based messaging. Updated cluster management to use Netty based messaging

Showing 40 changed files with 1135 additions and 920 deletions
package org.onlab.onos.store.cluster.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private DefaultControllerNode localNode;
private ClusterNodesDelegate nodesDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
@Activate
public void activate() {
addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
}
log.info("Stopped");
}
@Override
public boolean send(ClusterMessage message) {
boolean ok = true;
for (DefaultControllerNode node : nodes) {
if (!node.equals(localNode)) {
ok = send(message, node.id()) && ok;
}
}
return ok;
}
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
ClusterMessageStream stream = streams.get(toNodeId);
if (stream != null && !toNodeId.equals(localNode.id())) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send message {} to node {}",
message.subject(), toNodeId);
}
}
return false;
}
@Override
public synchronized void addSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void addNode(DefaultControllerNode node) {
nodes.add(node);
}
@Override
public void removeNode(DefaultControllerNode node) {
send(new LeavingMemberMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
@Override
public void startUp(DefaultControllerNode localNode,
ClusterNodesDelegate delegate) {
this.localNode = localNode;
this.nodesDelegate = delegate;
startCommunications();
startListening();
startInitiatingConnections();
log.info("Started");
}
@Override
public void clearAllNodesAndStreams() {
nodes.clear();
send(new LeavingMemberMessage(localNode.id()));
for (ClusterMessageStream stream : streams.values()) {
stream.close();
}
streams.clear();
}
/**
* Dispatches the specified message to all subscribers to its subject.
*
* @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void dispatch(ClusterMessage message, NodeId fromNodeId) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message, fromNodeId);
}
}
}
/**
* Adds the stream associated with the specified node.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return controller node bound to the stream
*/
DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
ClusterMessageStream stream) {
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
return node;
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node.id());
streams.remove(node.id());
}
/**
* Finds the least utilized IO worker.
*
* @return IO worker
*/
ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiatingConnections() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO worker.
*
* @param worker loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker worker) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
worker.connectStream(ch);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
try {
initiateConnection(node, findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
private class MembershipSubscriber implements MessageSubscriber {
@Override
public void receive(ClusterMessage message, NodeId fromNodeId) {
MessageSubject subject = message.subject();
ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
if (message.subject() == MessageSubject.NEW_MEMBER) {
log.info("Node {} arrived", cmm.nodeId());
nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
} else if (subject == MessageSubject.LEAVING_MEMBER) {
log.info("Node {} is leaving", cmm.nodeId());
nodesDelegate.nodeRemoved(cmm.nodeId());
}
}
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AcceptorLoop;
import org.onlab.packet.IpPrefix;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import static java.net.InetAddress.getByAddress;
/**
* Listens to inbound connection requests and accepts them.
*/
public class ClusterConnectionListener extends AcceptorLoop {
private static final long SELECT_TIMEOUT = 50;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final ClusterCommunicationManager manager;
ClusterConnectionListener(ClusterCommunicationManager manager,
IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.manager = manager;
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
manager.findWorker().acceptStream(sc);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Objects;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Performs the IO operations related to a cluster-wide communications.
*/
public class ClusterIOWorker extends
IOLoop<ClusterMessage, ClusterMessageStream> {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long SELECT_TIMEOUT = 50;
private final ClusterCommunicationManager manager;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param manager parent comms manager
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ClusterCommunicationManager manager,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.manager = manager;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
@Override
protected ClusterMessageStream createStream(ByteChannel byteChannel) {
return new ClusterMessageStream(serializationService, this, byteChannel);
}
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
for (ClusterMessage message : messages) {
manager.dispatch(message, nodeId);
}
}
// Retrieves the node from the stream. If one is not bound, it attempts
// to bind it using the knowledge that the first message must be a hello.
private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
DefaultControllerNode node = stream.node();
if (node == null && !messages.isEmpty()) {
ClusterMessage firstMessage = messages.get(0);
if (firstMessage instanceof HelloMessage) {
HelloMessage hello = (HelloMessage) firstMessage;
node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
hello.tcpPort(), stream);
}
}
return node != null ? node.id() : null;
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(helloMessage);
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
stream.write(helloMessage);
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
}
}
@Override
protected void removeStream(MessageStream<ClusterMessage> stream) {
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
manager.removeNodeStream(node);
}
super.removeStream(stream);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring messages between two cluster members.
*/
public class ClusterMessageStream extends MessageStream<ClusterMessage> {
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private DefaultControllerNode node;
private SerializationService serializationService;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param serializationService service for encoding/decoding messages
* @param loop IO loop
* @param byteChannel backing byte channel
*/
public ClusterMessageStream(SerializationService serializationService,
IOLoop<ClusterMessage, ?> loop,
ByteChannel byteChannel) {
super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
this.serializationService = serializationService;
}
/**
* Returns the node with which this stream is associated.
*
* @return controller node
*/
public DefaultControllerNode node() {
return node;
}
/**
* Sets the node with which this stream is affiliated.
*
* @param node controller node
*/
public void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected ClusterMessage read(ByteBuffer buffer) {
return serializationService.decode(buffer);
}
@Override
protected void write(ClusterMessage message, ByteBuffer buffer) {
serializationService.encode(message, buffer);
}
}
package org.onlab.onos.store.cluster.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -14,6 +19,8 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -22,6 +29,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
......@@ -40,21 +48,25 @@ public class DistributedClusterStore
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
.removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationAdminService communicationAdminService;
private ClusterCommunicationAdminService clusterCommunicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
@Activate
public void activate() {
public void activate() throws IOException {
loadClusterDefinition();
establishSelfIdentity();
// Start-up the comm service and prime it with the loaded nodes.
communicationAdminService.startUp(localNode, nodesDelegate);
clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
communicationAdminService.addNode(node);
clusterCommunicationAdminService.addNode(node);
}
log.info("Started");
}
......@@ -121,15 +133,13 @@ public class DistributedClusterStore
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
communicationAdminService.addNode(node);
clusterCommunicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
// We are being ejected from the cluster, so remove all other nodes.
communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
nodes.put(localNode.id(), localNode);
......@@ -137,7 +147,7 @@ public class DistributedClusterStore
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
communicationAdminService.removeNode(node);
clusterCommunicationAdminService.removeNode(node);
}
}
}
......@@ -151,6 +161,7 @@ public class DistributedClusterStore
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
states.put(nodeId, State.ACTIVE);
livenessCache.put(nodeId, node);
return node;
}
......@@ -165,4 +176,13 @@ public class DistributedClusterStore
}
}
private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
@Override
public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
NodeId nodeId = entry.getKey();
log.warn("Failed to receive heartbeats from controller: " + nodeId);
nodesDelegate.nodeVanished(nodeId);
}
}
}
......
......@@ -21,12 +21,7 @@ import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.EchoMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
......@@ -43,12 +38,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
......@@ -96,11 +88,7 @@ public class MessageSerializer implements SerializationService {
Link.Type.class,
MessageSubject.class,
HelloMessage.class,
NewMemberMessage.class,
LeavingMemberMessage.class,
EchoMessage.class
MessageSubject.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
......@@ -118,49 +106,12 @@ public class MessageSerializer implements SerializationService {
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return (ClusterMessage) serializerPool.deserialize(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
log.warn("Unable to decode message due to: " + e);
}
return null;
public Object decode(byte[] data) {
return serializerPool.deserialize(data);
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
byte[] data = serializerPool.serialize(message);
buffer.putLong(MARKER);
buffer.putInt(data.length + METADATA_LENGTH);
buffer.put(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
log.warn("Unable to encode message due to: " + e);
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
}
}
\ No newline at end of file
......
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_ADVERTISEMENT;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
......@@ -15,7 +14,7 @@ import com.google.common.collect.ImmutableMap;
*
* @param <ID> ID type
*/
public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
public class AntiEntropyAdvertisement<ID> {
private final NodeId sender;
private final ImmutableMap<ID, Timestamp> advertisement;
......@@ -27,7 +26,6 @@ public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
* @param advertisement timestamp information of the data sender holds
*/
public AntiEntropyAdvertisement(NodeId sender, Map<ID, Timestamp> advertisement) {
super(AE_ADVERTISEMENT);
this.sender = sender;
this.advertisement = ImmutableMap.copyOf(advertisement);
}
......@@ -42,7 +40,6 @@ public class AntiEntropyAdvertisement<ID> extends ClusterMessage {
// Default constructor for serializer
protected AntiEntropyAdvertisement() {
super(AE_ADVERTISEMENT);
this.sender = null;
this.advertisement = null;
}
......
package org.onlab.onos.store.cluster.messaging;
import static org.onlab.onos.store.cluster.messaging.MessageSubject.AE_REPLY;
import java.util.Map;
import java.util.Set;
......@@ -18,7 +16,7 @@ import com.google.common.collect.ImmutableSet;
* Suggest to the sender about the more up-to-date data this node has,
* and request for more recent data that the receiver has.
*/
public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMessage {
public class AntiEntropyReply<ID, V extends VersionedValue<?>> {
private final NodeId sender;
private final ImmutableMap<ID, V> suggestion;
......@@ -34,7 +32,6 @@ public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMe
public AntiEntropyReply(NodeId sender,
Map<ID, V> suggestion,
Set<ID> request) {
super(AE_REPLY);
this.sender = sender;
this.suggestion = ImmutableMap.copyOf(suggestion);
this.request = ImmutableSet.copyOf(request);
......@@ -74,7 +71,6 @@ public class AntiEntropyReply<ID, V extends VersionedValue<?>> extends ClusterMe
// Default constructor for serializer
protected AntiEntropyReply() {
super(AE_REPLY);
this.sender = null;
this.suggestion = null;
this.request = null;
......
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
/**
* Service for administering communications manager.
......@@ -8,29 +9,21 @@ import org.onlab.onos.cluster.DefaultControllerNode;
public interface ClusterCommunicationAdminService {
/**
* Initialize.
*/
void initialize(ControllerNode localNode, ClusterNodesDelegate nodesDelegate);
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node);
void addNode(ControllerNode node);
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node);
/**
* Starts-up the communications engine.
*
* @param localNode local controller node
* @param delegate nodes delegate
*/
void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
/**
* Clears all nodes and streams as part of leaving the cluster.
*/
void clearAllNodesAndStreams();
void removeNode(ControllerNode node);
}
\ No newline at end of file
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import java.io.IOException;
import java.util.Set;
import org.onlab.onos.cluster.NodeId;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
* Sends a message to all controller nodes.
* Broadcast a message to all controller nodes.
*
* @param message message to send
* @return true if the message was sent sucessfully to all nodes; false
* if there is no stream or if there was an error for some node
* @return true if the message was sent successfully to all nodes; false otherwise.
*/
boolean send(ClusterMessage message);
boolean broadcast(ClusterMessage message) throws IOException;
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
* @return true if the message was sent successfully; false otherwise.
*/
boolean send(ClusterMessage message, NodeId toNodeId);
boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException;
/**
* Adds a new subscriber for the specified message subject.
* Multicast a message to a set of controller nodes.
*
* @param subject message subject
* @param subscriber message subscriber
* @param message message to send
* @return true if the message was sent successfully to all nodes in the group; false otherwise.
*/
void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException;
/**
* Removes the specified subscriber from the given message subject.
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Returns the set of subscribers for the specified message subject.
*
* @param subject message subject
* @return set of message subscribers
*/
Set<MessageSubscriber> getSubscribers(MessageSubject subject);
void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Base for cluster membership messages.
*/
public abstract class ClusterMembershipMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
protected ClusterMembershipMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new membership message for the specified end-point data.
*
* @param subject message subject
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
IpPrefix ipAddress, int tcpPort) {
super(subject);
this.nodeId = nodeId;
this.ipAddress = ipAddress;
this.tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.AbstractMessage;
import static com.google.common.base.MoreObjects.toStringHelper;
import org.onlab.onos.cluster.NodeId;
/**
* Base message for cluster-wide communications.
*/
public abstract class ClusterMessage extends AbstractMessage {
public class ClusterMessage {
private final NodeId sender;
private final MessageSubject subject;
private final Object payload;
/**
* Creates a cluster message.
*
* @param subject message subject
*/
protected ClusterMessage(MessageSubject subject) {
public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
this.sender = sender;
this.subject = subject;
this.payload = payload;
}
/**
* Returns the id of the controller sending this message.
*
* @return message sender id.
*/
public NodeId sender() {
return sender;
}
/**
......@@ -29,9 +40,12 @@ public abstract class ClusterMessage extends AbstractMessage {
return subject;
}
@Override
public String toString() {
return toStringHelper(this).add("subject", subject).add("length", length).toString();
/**
* Returns the message payload.
*
* @return message payload.
*/
public Object payload() {
return payload;
}
}
......
package org.onlab.onos.store.cluster.messaging;
public interface ClusterMessageHandler {
public void handle(ClusterMessage message);
}
\ No newline at end of file
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**l
* Echo heart-beat message that nodes send to each other.
*/
public class EchoMessage extends ClusterMessage {
private NodeId nodeId;
// For serialization
private EchoMessage() {
super(MessageSubject.HELLO);
nodeId = null;
}
/**
* Creates a new heart-beat echo message.
*
* @param nodeId sending node identification
*/
public EchoMessage(NodeId nodeId) {
super(MessageSubject.HELLO);
nodeId = nodeId;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMembershipMessage {
// For serialization
private HelloMessage() {
}
/**
* Creates a new hello message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Announcement message that nodes use to gossip about team departures.
*/
public class LeavingMemberMessage extends ClusterMembershipMessage {
// For serialization
private LeavingMemberMessage() {
super();
}
/**
* Creates a new goodbye message.
*
* @param nodeId sending node identification
*/
public LeavingMemberMessage(NodeId nodeId) {
super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
}
}
......@@ -3,24 +3,20 @@ package org.onlab.onos.store.cluster.messaging;
/**
* Representation of a message subject.
*/
public enum MessageSubject {
public class MessageSubject {
/** Represents a first greeting message. */
HELLO,
private final String value;
/** Signifies announcement about new member. */
NEW_MEMBER,
public MessageSubject(String value) {
this.value = value;
}
/** Signifies announcement about leaving member. */
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO,
/** Anti-Entropy advertisement message. */
AE_ADVERTISEMENT,
/** Anti-Entropy reply message. */
AE_REPLY,
public String value() {
return value;
}
@Override
public String toString() {
return value;
}
}
......
......@@ -13,6 +13,6 @@ public interface MessageSubscriber {
* @param message message to be received
* @param fromNodeId node from which the message was received
*/
void receive(ClusterMessage message, NodeId fromNodeId);
void receive(Object messagePayload, NodeId fromNodeId);
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Announcement message that nodes use to gossip about new arrivals.
*/
public class NewMemberMessage extends ClusterMembershipMessage {
// For serialization
private NewMemberMessage() {
}
/**
* Creates a new gossip message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
}
}
package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for encoding &amp; decoding intra-cluster messages.
*/
......@@ -13,7 +11,7 @@ public interface SerializationService {
* @param buffer byte buffer with message(s)
* @return parsed message
*/
ClusterMessage decode(ByteBuffer buffer);
Object decode(byte[] data);
/**
* Encodes the specified message into the given byte buffer.
......@@ -21,6 +19,6 @@ public interface SerializationService {
* @param message message to be encoded
* @param buffer byte buffer to receive the message data
*/
void encode(ClusterMessage message, ByteBuffer buffer);
byte[] encode(Object message);
}
......
package org.onlab.onos.store.cluster.messaging.impl;
import org.onlab.onos.cluster.ControllerNode;
/**
* Contains information that will be published when a cluster membership event occurs.
*/
public class ClusterMembershipEvent {
private final ClusterMembershipEventType type;
private final ControllerNode node;
public ClusterMembershipEvent(ClusterMembershipEventType type, ControllerNode node) {
this.type = type;
this.node = node;
}
public ClusterMembershipEventType type() {
return type;
}
public ControllerNode node() {
return node;
}
}
package org.onlab.onos.store.cluster.messaging.impl;
public enum ClusterMembershipEventType {
NEW_MEMBER,
LEAVING_MEMBER,
UNREACHABLE_MEMBER,
HEART_BEAT,
}
package org.onlab.onos.store.cluster.messaging.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
public final class ClusterMessageSubjects {
private ClusterMessageSubjects() {}
public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
}
package org.onlab.onos.store.cluster.messaging.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.Message;
import org.onlab.onos.store.messaging.MessageHandler;
import org.onlab.onos.store.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(immediate = true)
@Service
public class OnosClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private ControllerNode localNode;
private ClusterNodesDelegate nodesDelegate;
private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
for (ControllerNode node : members.values()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
}
return ok;
}
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
boolean ok = true;
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicast(message, nodeId) && ok;
}
}
return ok;
}
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
ControllerNode node = members.get(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, message.subject().value(), message);
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
}
return false;
}
@Override
public void addSubscriber(MessageSubject subject,
ClusterMessageHandler subscriber) {
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
}
@Override
public void initialize(ControllerNode localNode,
ClusterNodesDelegate delegate) {
this.localNode = localNode;
this.nodesDelegate = delegate;
this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
}
@Override
public void addNode(ControllerNode node) {
members.put(node.id(), node);
}
@Override
public void removeNode(ControllerNode node) {
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
members.remove(node.id());
}
// Sends a heart beat to all peers.
private class KeepAlive extends TimerTask {
@Override
public void run() {
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
}
}
private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
} else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
log.info("Node {} is leaving", node.id());
nodesDelegate.nodeRemoved(node.id());
} else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
log.info("Node {} is unreachable", node.id());
nodesDelegate.nodeVanished(node.id());
}
}
}
private static class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
public InternalClusterMessageHandler(ClusterMessageHandler handler) {
this.handler = handler;
}
@Override
public void handle(Message message) {
handler.handle((ClusterMessage) message.payload());
}
}
}
/**
* Implementation of link store using distributed p2p synchronization protocol.
*/
package org.onlab.onos.store.link.impl;
\ No newline at end of file
package org.onlab.onos.store.messaging;
/**
* Representation of a TCP/UDP communication end point.
*/
public class Endpoint {
private final int port;
private final String host;
public Endpoint(String host, int port) {
this.host = host;
this.port = port;
}
public String host() {
return host;
}
public int port() {
return port;
}
@Override
public String toString() {
return "Endpoint [port=" + port + ", host=" + host + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Endpoint other = (Endpoint) obj;
if (host == null) {
if (other.host != null) {
return false;
}
} else if (!host.equals(other.host)) {
return false;
}
if (port != other.port) {
return false;
}
return true;
}
}
package org.onlab.onos.store.messaging;
import java.io.IOException;
/**
* A unit of communication.
* Has a payload. Also supports a feature to respond back to the sender.
*/
public interface Message {
/**
* Returns the payload of this message.
* @return message payload.
*/
public Object payload();
/**
* Sends a reply back to the sender of this messge.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
public void respond(Object data) throws IOException;
}
package org.onlab.onos.store.messaging;
import java.io.IOException;
/**
* Handler for a message.
*/
public interface MessageHandler {
/**
* Handles the message.
* @param message message.
* @throws IOException.
*/
public void handle(Message message) throws IOException;
}
package org.onlab.onos.store.messaging;
import java.io.IOException;
/**
* Interface for low level messaging primitives.
*/
public interface MessagingService {
/**
* Sends a message asynchronously to the specified communication end point.
* The message is specified using the type and payload.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @throws IOException
*/
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
/**
* Sends a message synchronously and waits for a response.
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload.
* @return a response future
* @throws IOException
*/
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
/**
* Registers a new message handler for message type.
* @param type message type.
* @param handler message handler
*/
public void registerHandler(String type, MessageHandler handler);
/**
* Unregister current handler, if one exists for message type.
* @param type message type
*/
public void unregisterHandler(String type);
}
package org.onlab.onos.store.messaging;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
*
* @param <T> type of response.
*/
public interface Response<T> {
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
* @return response
* @throws TimeoutException if the timeout expires before the response arrives.
*/
public T get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Gets the response waiting for indefinite timeout period.
* @return response
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
public T get() throws InterruptedException;
/**
* Checks if the response is ready without blocking.
* @return true if response is ready, false otherwise.
*/
public boolean isReady();
}
package org.onlab.onos.store.messaging.impl;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.onos.store.messaging.Response;
/**
* An asynchronous response.
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
* @param <T> type of response.
*/
public class AsyncResponse<T> implements Response<T> {
private T value;
private boolean done = false;
private final long start = System.nanoTime();
@Override
public T get(long timeout, TimeUnit tu) throws TimeoutException {
timeout = tu.toNanos(timeout);
boolean interrupted = false;
try {
synchronized (this) {
while (!done) {
try {
long timeRemaining = timeout - (System.nanoTime() - start);
if (timeRemaining <= 0) {
throw new TimeoutException("Operation timed out.");
}
TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
} catch (InterruptedException e) {
interrupted = true;
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
return value;
}
@Override
public T get() throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public boolean isReady() {
return done;
}
/**
* Sets response value and unblocks any thread blocking on the response to become
* available.
* @param data response data.
*/
@SuppressWarnings("unchecked")
public synchronized void setResponse(Object data) {
if (!done) {
done = true;
value = (T) data;
this.notifyAll();
}
}
}
package org.onlab.onos.store.messaging.impl;
import java.io.IOException;
import org.onlab.onos.store.messaging.Message;
import org.onlab.onos.store.messaging.MessageHandler;
/**
* Message handler that echos the message back to the sender.
*/
public class EchoHandler implements MessageHandler {
@Override
public void handle(Message message) throws IOException {
System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
message.respond(message.payload());
}
}
package org.onlab.onos.store.messaging.impl;
import java.io.IOException;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.Message;
/**
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
*/
public final class InternalMessage implements Message {
private long id;
private Endpoint sender;
private String type;
private Object payload;
private transient NettyMessagingService messagingService;
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
// Must be created using the Builder.
private InternalMessage() {}
public long id() {
return id;
}
public String type() {
return type;
}
public Endpoint sender() {
return sender;
}
@Override
public Object payload() {
return payload;
}
@Override
public void respond(Object data) throws IOException {
Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id)
// FIXME: Sender should be messagingService.localEp.
.withSender(this.sender)
.withPayload(data)
.withType(REPLY_MESSAGE_TYPE)
.build();
messagingService.sendAsync(sender, message);
}
/**
* Builder for InternalMessages.
*/
public static class Builder {
private InternalMessage message;
public Builder(NettyMessagingService messagingService) {
message = new InternalMessage();
message.messagingService = messagingService;
}
public Builder withId(long id) {
message.id = id;
return this;
}
public Builder withType(String type) {
message.type = type;
return this;
}
public Builder withSender(Endpoint sender) {
message.sender = sender;
return this;
}
public Builder withPayload(Object payload) {
message.payload = payload;
return this;
}
public InternalMessage build() {
return message;
}
}
}
package org.onlab.onos.store.messaging.impl;
import org.onlab.onos.store.messaging.Message;
import org.onlab.onos.store.messaging.MessageHandler;
/**
* A MessageHandler that simply logs the information.
*/
public class LoggingHandler implements MessageHandler {
@Override
public void handle(Message message) {
System.out.println("Received: " + message.payload());
}
}
\ No newline at end of file
package org.onlab.onos.store.messaging.impl;
import java.util.Arrays;
import java.util.List;
import static com.google.common.base.Preconditions.checkState;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.messaging.Endpoint;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
/**
* Decode bytes into a InrenalMessage.
*/
public class MessageDecoder extends ByteToMessageDecoder {
private final NettyMessagingService messagingService;
private final SerializationService serializationService;
public MessageDecoder(NettyMessagingService messagingService, SerializationService serializationService) {
this.messagingService = messagingService;
this.serializationService = serializationService;
}
@Override
protected void decode(ChannelHandlerContext context, ByteBuf in,
List<Object> messages) throws Exception {
byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
// read message Id.
long id = in.readLong();
// read message type; first read size and then bytes.
String type = new String(in.readBytes(in.readInt()).array());
// read sender host name; first read size and then bytes.
String host = new String(in.readBytes(in.readInt()).array());
// read sender port.
int port = in.readInt();
Endpoint sender = new Endpoint(host, port);
// read message payload; first read size and then bytes.
Object payload = serializationService.decode(in.readBytes(in.readInt()).array());
InternalMessage message = new InternalMessage.Builder(messagingService)
.withId(id)
.withSender(sender)
.withType(type)
.withPayload(payload)
.build();
messages.add(message);
}
}
package org.onlab.onos.store.messaging.impl;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Encode InternalMessage out into a byte buffer.
*/
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
private final SerializationService serializationService;
public MessageEncoder(SerializationService serializationService) {
this.serializationService = serializationService;
}
@Override
protected void encode(ChannelHandlerContext context, InternalMessage message,
ByteBuf out) throws Exception {
// write preamble
out.writeBytes(PREAMBLE);
// write id
out.writeLong(message.id());
// write type length
out.writeInt(message.type().length());
// write type
out.writeBytes(message.type().getBytes());
// write sender host name size
out.writeInt(message.sender().host().length());
// write sender host name.
out.writeBytes(message.sender().host().getBytes());
// write port
out.writeInt(message.sender().port());
try {
serializationService.encode(message.payload());
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = serializationService.encode(message.payload());
// write payload length.
out.writeInt(payload.length);
// write payload bytes
out.writeBytes(payload);
}
}
package org.onlab.onos.store.messaging.impl;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.MessageHandler;
import org.onlab.onos.store.messaging.MessagingService;
import org.onlab.onos.store.messaging.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* A Netty based implementation of MessagingService.
*/
@Component(immediate = true)
@Service
public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KeyedObjectPool<Endpoint, Channel> channels =
new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
private final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private Cache<Long, AsyncResponse<?>> responseFutures;
private final Endpoint localEp;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SerializationService serializationService;
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
}
// FIXME: Constructor should not throw exceptions.
public NettyMessagingService(int port) {
this.port = port;
try {
localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
} catch (UnknownHostException e) {
// bailing out.
throw new RuntimeException(e);
}
}
@Activate
public void activate() throws Exception {
responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
startAcceptingConnections();
}
@Deactivate
public void deactivate() throws Exception {
channels.close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong())
.withSender(localEp)
.withType(type)
.withPayload(payload)
.build();
sendAsync(ep, message);
}
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
Channel channel = null;
try {
channel = channels.borrowObject(ep);
channel.eventLoop().execute(new WriteTask(channel, message));
} catch (Exception e) {
throw new IOException(e);
} finally {
try {
channels.returnObject(ep, channel);
} catch (Exception e) {
log.warn("Error returning object back to the pool", e);
// ignored.
}
}
}
@Override
public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
throws IOException {
AsyncResponse<T> futureResponse = new AsyncResponse<T>();
Long messageId = RandomUtils.nextLong();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
.withType(type)
.withPayload(payload)
.build();
sendAsync(ep, message);
return futureResponse;
}
@Override
public void registerHandler(String type, MessageHandler handler) {
// TODO: Is this the right semantics for handler registration?
handlers.putIfAbsent(type, handler);
}
public void unregisterHandler(String type) {
handlers.remove(type);
}
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new OnosCommunicationChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
b.bind(port).sync();
}
private class OnosCommunicationChannelFactory
implements KeyedPoolableObjectFactory<Endpoint, Channel> {
@Override
public void activateObject(Endpoint endpoint, Channel channel)
throws Exception {
}
@Override
public void destroyObject(Endpoint ep, Channel channel) throws Exception {
channel.close();
}
@Override
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
return f.channel();
}
@Override
public void passivateObject(Endpoint ep, Channel channel)
throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, Channel channel) {
return channel.isOpen();
}
}
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new MessageEncoder(serializationService))
.addLast(new MessageDecoder(NettyMessagingService.this, serializationService))
.addLast(new NettyMessagingService.InboundMessageDispatcher());
}
}
private class WriteTask implements Runnable {
private final Object message;
private final Channel channel;
public WriteTask(Channel channel, Object message) {
this.message = message;
this.channel = channel;
}
@Override
public void run() {
channel.writeAndFlush(message);
}
}
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
AsyncResponse<?> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.setResponse(message.payload());
}
log.warn("Received a reply. But was unable to locate the request handle");
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
}
return;
}
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
handler.handle(message);
}
}
}
package org.onlab.onos.store.messaging.impl;
import java.util.concurrent.TimeUnit;
import org.onlab.onos.store.cluster.impl.MessageSerializer;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.Response;
public final class SimpleClient {
private SimpleClient() {}
public static void main(String... args) throws Exception {
NettyMessagingService messaging = new TestNettyMessagingService(9081);
messaging.activate();
messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World");
System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
}
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
MessageSerializer mgr = new MessageSerializer();
mgr.activate();
this.serializationService = mgr;
}
}
}
package org.onlab.onos.store.messaging.impl;
import org.onlab.onos.store.cluster.impl.MessageSerializer;
public final class SimpleServer {
private SimpleServer() {}
public static void main(String... args) throws Exception {
NettyMessagingService server = new TestNettyMessagingService();
server.activate();
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
public static class TestNettyMessagingService extends NettyMessagingService {
protected TestNettyMessagingService() {
MessageSerializer mgr = new MessageSerializer();
mgr.activate();
this.serializationService = mgr;
}
}
}
......@@ -6,6 +6,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
import org.onlab.onos.store.messaging.impl.NettyMessagingService;
import org.onlab.packet.IpPrefix;
import java.util.concurrent.CountDownLatch;
......@@ -27,8 +29,8 @@ public class ClusterCommunicationManagerTest {
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
private ClusterCommunicationManager ccm1;
private ClusterCommunicationManager ccm2;
private OnosClusterCommunicationManager ccm1;
private OnosClusterCommunicationManager ccm2;
private TestDelegate cnd1 = new TestDelegate();
private TestDelegate cnd2 = new TestDelegate();
......@@ -37,20 +39,23 @@ public class ClusterCommunicationManagerTest {
private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
@Before
public void setUp() {
public void setUp() throws Exception {
MessageSerializer messageSerializer = new MessageSerializer();
messageSerializer.activate();
ccm1 = new ClusterCommunicationManager();
ccm1.serializationService = messageSerializer;
NettyMessagingService messagingService = new NettyMessagingService();
messagingService.activate();
ccm1 = new OnosClusterCommunicationManager();
// ccm1.serializationService = messageSerializer;
ccm1.activate();
ccm2 = new ClusterCommunicationManager();
ccm2.serializationService = messageSerializer;
ccm2 = new OnosClusterCommunicationManager();
// ccm2.serializationService = messageSerializer;
ccm2.activate();
ccm1.startUp(node1, cnd1);
ccm2.startUp(node2, cnd2);
ccm1.initialize(node1, cnd1);
ccm2.initialize(node2, cnd2);
}
@After
......@@ -71,6 +76,7 @@ public class ClusterCommunicationManagerTest {
}
@Test
@Ignore
public void disconnect() throws Exception {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
......