Aaron Kruglikov
Committed by Gerrit Code Review

Migrating netty messaging into netty messaging manager.

Change-Id: I971db195c9dc155cdf76850f0427ef9b9210113c
/*
* Copyright 2014-2015 Open Networking Laboratory
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
package org.onosproject.store.cluster.messaging.impl;
/**
* State transitions a decoder goes through as it is decoding an incoming message.
......
/*
* Copyright 2014-2015 Open Networking Laboratory
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,13 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.MoreObjects;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.MoreObjects;
/**
* Internal message representation with additional attributes
* for supporting, synchronous request/reply behavior.
......
/*
* Copyright 2014-2015 Open Networking Laboratory
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,21 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import java.util.List;
import static com.google.common.base.Preconditions.checkState;
......@@ -54,7 +52,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
}
@Override
@java.lang.SuppressWarnings("squid:S128") // suppress switch fall through warning
@SuppressWarnings("squid:S128") // suppress switch fall through warning
protected void decode(
ChannelHandlerContext context,
ByteBuf buffer,
......
/*
* Copyright 2014-2015 Open Networking Laboratory
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,22 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import java.io.IOException;
/**
* Encode InternalMessage out into a byte buffer.
......
......@@ -17,29 +17,114 @@ package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.NettyMessaging;
import org.onlab.netty.InternalMessage;
import org.onlab.netty.MessageDecoder;
import org.onlab.netty.MessageEncoder;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
* Netty based MessagingService.
*/
@Component(immediate = true, enabled = true)
@Service
public class NettyMessagingManager extends NettyMessaging {
public class NettyMessagingManager implements MessagingService {
private static final short MIN_KS_LENGTH = 6;
private final Logger log = LoggerFactory.getLogger(getClass());
private static final short MIN_KS_LENGTH = 6;
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private Endpoint localEp;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Long, Callback>() {
@Override
public void onRemoval(RemovalNotification<Long, Callback> entry) {
if (entry.wasEvicted()) {
entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
.build();
private final GenericKeyedObjectPool<Endpoint, Connection> channels
= new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
private Class<? extends ServerChannel> serverChannelClass;
private Class<? extends Channel> clientChannelClass;
protected static final boolean TLS_DISABLED = false;
protected boolean enableNettyTls = TLS_DISABLED;
protected String ksLocation;
protected String tsLocation;
protected char[] ksPwd;
protected char[] tsPwd;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService clusterMetadataService;
......@@ -48,14 +133,32 @@ public class NettyMessagingManager extends NettyMessaging {
public void activate() throws Exception {
ControllerNode localNode = clusterMetadataService.getLocalNode();
getTlsParameters();
super.start(clusterMetadataService.getClusterMetadata().getName().hashCode(),
new Endpoint(localNode.ip(), localNode.tcpPort()));
if (started.get()) {
log.warn("Already running at local endpoint: {}", localEp);
return;
}
this.preamble = clusterMetadataService.getClusterMetadata().getName().hashCode();
this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
channels.setLifo(true);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
channels.setMinEvictableIdleTimeMillis(60_000L);
channels.setTimeBetweenEvictionRunsMillis(30_000L);
initEventLoopGroup();
startAcceptingConnections();
started.set(true);
log.info("Started");
}
@Deactivate
public void deactivate() throws Exception {
super.stop();
if (started.get()) {
channels.close();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
started.set(false);
}
log.info("Stopped");
}
......@@ -86,4 +189,409 @@ public class NettyMessagingManager extends NettyMessaging {
}
}
}
private void initEventLoopGroup() {
// try Epoll first and if that does work, use nio.
try {
clientGroup = new EpollEventLoopGroup();
serverGroup = new EpollEventLoopGroup();
serverChannelClass = EpollServerSocketChannel.class;
clientChannelClass = EpollSocketChannel.class;
return;
} catch (Throwable e) {
log.debug("Failed to initialize native (epoll) transport. "
+ "Reason: {}. Proceeding with nio.", e.getMessage());
}
clientGroup = new NioEventLoopGroup();
serverGroup = new NioEventLoopGroup();
serverChannelClass = NioServerSocketChannel.class;
clientChannelClass = NioSocketChannel.class;
}
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
return sendAsync(ep, message);
}
protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
if (ep.equals(localEp)) {
try {
dispatchLocally(message);
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = new CompletableFuture<>();
try {
Connection connection = null;
try {
connection = channels.borrowObject(ep);
connection.send(message, future);
} finally {
channels.returnObject(ep, connection);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
@Override
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
}
@Override
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
CompletableFuture<byte[]> response = new CompletableFuture<>();
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
callbacks.invalidate(messageId);
}
}).thenCompose(v -> response);
}
@Override
public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
byte[] responsePayload = handler.apply(message.sender(), message.payload());
if (responsePayload != null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
sendAsync(message.sender(), response).whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to respond", error);
}
});
}
}));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
if (error == null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
result);
sendAsync(message.sender(), response).whenComplete((r, e) -> {
if (e != null) {
log.debug("Failed to respond", e);
}
});
}
});
});
}
@Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
b.option(ChannelOption.SO_RCVBUF, 1048576);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup);
b.channel(serverChannelClass);
if (enableNettyTls) {
b.childHandler(new SslServerCommunicationChannelInitializer());
} else {
b.childHandler(new OnosCommunicationChannelInitializer());
}
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
b.bind(localEp.port()).sync().addListener(future -> {
if (future.isSuccess()) {
log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
} else {
log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
}
});
}
private class OnosCommunicationChannelFactory
implements KeyedPoolableObjectFactory<Endpoint, Connection> {
@Override
public void activateObject(Endpoint endpoint, Connection connection)
throws Exception {
}
@Override
public void destroyObject(Endpoint ep, Connection connection) throws Exception {
log.debug("Closing connection to {}", ep);
//Is this the right way to destroy?
connection.destroy();
}
@Override
public Connection makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
bootstrap.channel(clientChannelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
if (enableNettyTls) {
bootstrap.handler(new SslClientCommunicationChannelInitializer());
} else {
bootstrap.handler(new OnosCommunicationChannelInitializer());
}
// Start the client.
CompletableFuture<Channel> retFuture = new CompletableFuture<>();
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
f.addListener(future -> {
if (future.isSuccess()) {
retFuture.complete(f.channel());
} else {
retFuture.completeExceptionally(future.cause());
}
});
log.debug("Established a new connection to {}", ep);
return new Connection(retFuture);
}
@Override
public void passivateObject(Endpoint ep, Connection connection)
throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, Connection connection) {
return connection.validate();
}
}
private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new FileInputStream(tsLocation), tsPwd);
tmFactory.init(ts);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(ksLocation), ksPwd);
kmf.init(ks, ksPwd);
SSLContext serverContext = SSLContext.getInstance("TLS");
serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
SSLEngine serverSslEngine = serverContext.createSSLEngine();
serverSslEngine.setNeedClientAuth(true);
serverSslEngine.setUseClientMode(false);
serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
serverSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new FileInputStream(tsLocation), tsPwd);
tmFactory.init(ts);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(ksLocation), ksPwd);
kmf.init(ks, ksPwd);
SSLContext clientContext = SSLContext.getInstance("TLS");
clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
SSLEngine clientSslEngine = clientContext.createSSLEngine();
clientSslEngine.setUseClientMode(true);
clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
clientSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
try {
dispatchLocally(message);
} catch (RejectedExecutionException e) {
log.warn("Unable to dispatch message due to {}", e.getMessage());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
private void dispatchLocally(InternalMessage message) throws IOException {
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
Callback callback =
callbacks.getIfPresent(message.id());
if (callback != null) {
callback.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
} finally {
callbacks.invalidate(message.id());
}
return;
}
Consumer<InternalMessage> handler = handlers.get(type);
if (handler != null) {
handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}
}
private final class Callback {
private final CompletableFuture<byte[]> future;
private final Executor executor;
public Callback(CompletableFuture<byte[]> future, Executor executor) {
this.future = future;
this.executor = executor;
}
public void complete(byte[] value) {
executor.execute(() -> future.complete(value));
}
public void completeExceptionally(Throwable error) {
executor.execute(() -> future.completeExceptionally(error));
}
}
private final class Connection {
private final CompletableFuture<Channel> internalFuture;
public Connection(CompletableFuture<Channel> internalFuture) {
this.internalFuture = internalFuture;
}
/**
* Sends a message out on its channel and associated the message with a
* completable future used for signaling.
* @param message the message to be sent
* @param future a future that is completed normally or exceptionally if
* message sending succeeds or fails respectively
*/
public void send(Object message, CompletableFuture<Void> future) {
internalFuture.whenComplete((channel, throwable) -> {
if (throwable == null) {
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
future.completeExceptionally(channelFuture.cause());
} else {
future.complete(null);
}
});
} else {
future.completeExceptionally(throwable);
}
});
}
/**
* Destroys a channel by closing its channel (if it exists) and
* cancelling its future.
*/
public void destroy() {
Channel channel = internalFuture.getNow(null);
if (channel != null) {
channel.close();
}
internalFuture.cancel(false);
}
/**
* Determines whether the connection is valid meaning it is either
* complete with and active channel
* or it has not yet completed.
* @return true if the channel has an active connection or has not
* yet completed
*/
public boolean validate() {
if (internalFuture.isCompletedExceptionally()) {
return false;
}
Channel channel = internalFuture.getNow(null);
return channel == null || channel.isActive();
}
}
}
......
package org.onlab.netty;
package org.onosproject.store.cluster.messaging.impl;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
......@@ -9,10 +9,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.util.concurrent.MoreExecutors;
......@@ -24,34 +31,39 @@ import static org.onlab.junit.TestTools.findAvailablePort;
/**
* Unit tests for NettyMessaging.
*/
public class NettyMessagingTest {
public class NettyMessagingManagerTest {
NettyMessaging netty1;
NettyMessaging netty2;
NettyMessagingManager netty1;
NettyMessagingManager netty2;
Endpoint ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5001);
Endpoint ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5002);
Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5003);
private static final String DUMMY_NAME = "node";
private static final String IP_STRING = "127.0.0.1";
Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
@Before
public void setUp() throws Exception {
ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
netty1 = new NettyMessaging();
netty1.start(12, ep1);
netty1 = new NettyMessagingManager();
netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
netty1.activate();
ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
netty2 = new NettyMessaging();
netty2.start(12, ep2);
netty2 = new NettyMessagingManager();
netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
netty2.activate();
}
@After
public void tearDown() throws Exception {
if (netty1 != null) {
netty1.stop();
netty1.deactivate();
}
if (netty2 != null) {
netty2.stop();
netty2.deactivate();
}
}
......@@ -125,4 +137,40 @@ public class NettyMessagingTest {
assertEquals("completion-thread", completionThreadName.get());
assertEquals("handler-thread", handlerThreadName.get());
}
private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
return new ClusterMetadataService() {
@Override
public ClusterMetadata getClusterMetadata() {
return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
name, Sets.newHashSet(), Sets.newHashSet());
}
@Override
public ControllerNode getLocalNode() {
return new ControllerNode() {
@Override
public NodeId id() {
return null;
}
@Override
public IpAddress ip() {
return IpAddress.valueOf(ipAddress);
}
@Override
public int tcpPort() {
return ep.port();
}
};
}
@Override
public void addListener(ClusterMetadataEventListener listener) {}
@Override
public void removeListener(ClusterMetadataEventListener listener) {}
};
}
}
\ No newline at end of file
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onlab-utils</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onlab-netty</artifactId>
<packaging>bundle</packaging>
<description>Network I/O using Netty framework</description>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty4.version}</version>
</dependency>
</dependencies>
</project>
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.util.Tools;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/**
* Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
public class NettyMessaging implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private Endpoint localEp;
private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener(new RemovalListener<Long, Callback>() {
@Override
public void onRemoval(RemovalNotification<Long, Callback> entry) {
if (entry.wasEvicted()) {
entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
.build();
private final GenericKeyedObjectPool<Endpoint, Connection> channels
= new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
private Class<? extends ServerChannel> serverChannelClass;
private Class<? extends Channel> clientChannelClass;
protected static final boolean TLS_DISABLED = false;
protected boolean enableNettyTls = TLS_DISABLED;
protected String ksLocation;
protected String tsLocation;
protected char[] ksPwd;
protected char[] tsPwd;
@SuppressWarnings("squid:S1181")
// We really need to catch Throwable due to netty native epoll() handling
private void initEventLoopGroup() {
// try Epoll first and if that does work, use nio.
try {
clientGroup = new EpollEventLoopGroup();
serverGroup = new EpollEventLoopGroup();
serverChannelClass = EpollServerSocketChannel.class;
clientChannelClass = EpollSocketChannel.class;
return;
} catch (Throwable e) {
log.debug("Failed to initialize native (epoll) transport. "
+ "Reason: {}. Proceeding with nio.", e.getMessage());
}
clientGroup = new NioEventLoopGroup();
serverGroup = new NioEventLoopGroup();
serverChannelClass = NioServerSocketChannel.class;
clientChannelClass = NioSocketChannel.class;
}
public void start(int preamble, Endpoint localEp) throws Exception {
if (started.get()) {
log.warn("Already running at local endpoint: {}", localEp);
return;
}
this.preamble = preamble;
this.localEp = localEp;
channels.setLifo(true);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
channels.setMinEvictableIdleTimeMillis(60_000L);
channels.setTimeBetweenEvictionRunsMillis(30_000L);
initEventLoopGroup();
startAcceptingConnections();
started.set(true);
}
public void stop() throws Exception {
if (started.get()) {
channels.close();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
started.set(false);
}
}
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
return sendAsync(ep, message);
}
protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
if (ep.equals(localEp)) {
try {
dispatchLocally(message);
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = new CompletableFuture<>();
try {
Connection connection = null;
try {
connection = channels.borrowObject(ep);
connection.send(message, future);
} finally {
channels.returnObject(ep, connection);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}
@Override
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
}
@Override
public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
CompletableFuture<byte[]> response = new CompletableFuture<>();
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
callbacks.put(messageId, callback);
InternalMessage message = new InternalMessage(messageId, localEp, type, payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
callbacks.invalidate(messageId);
}
}).thenCompose(v -> response);
}
@Override
public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
byte[] responsePayload = handler.apply(message.sender(), message.payload());
if (responsePayload != null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
sendAsync(message.sender(), response).whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to respond", error);
}
});
}
}));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
if (error == null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
result);
sendAsync(message.sender(), response).whenComplete((r, e) -> {
if (e != null) {
log.debug("Failed to respond", e);
}
});
}
});
});
}
@Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
b.option(ChannelOption.SO_RCVBUF, 1048576);
b.option(ChannelOption.TCP_NODELAY, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(serverGroup, clientGroup);
b.channel(serverChannelClass);
if (enableNettyTls) {
b.childHandler(new SslServerCommunicationChannelInitializer());
} else {
b.childHandler(new OnosCommunicationChannelInitializer());
}
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
b.bind(localEp.port()).sync().addListener(future -> {
if (future.isSuccess()) {
log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
} else {
log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
}
});
}
private class OnosCommunicationChannelFactory
implements KeyedPoolableObjectFactory<Endpoint, Connection> {
@Override
public void activateObject(Endpoint endpoint, Connection connection)
throws Exception {
}
@Override
public void destroyObject(Endpoint ep, Connection connection) throws Exception {
log.debug("Closing connection to {}", ep);
//Is this the right way to destroy?
connection.destroy();
}
@Override
public Connection makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 10 * 32 * 1024);
bootstrap.option(ChannelOption.SO_SNDBUF, 1048576);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
bootstrap.channel(clientChannelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
if (enableNettyTls) {
bootstrap.handler(new SslClientCommunicationChannelInitializer());
} else {
bootstrap.handler(new OnosCommunicationChannelInitializer());
}
// Start the client.
CompletableFuture<Channel> retFuture = new CompletableFuture<>();
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
f.addListener(future -> {
if (future.isSuccess()) {
retFuture.complete(f.channel());
} else {
retFuture.completeExceptionally(future.cause());
}
});
log.debug("Established a new connection to {}", ep);
return new Connection(retFuture);
}
@Override
public void passivateObject(Endpoint ep, Connection connection)
throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, Connection connection) {
return connection.validate();
}
}
private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new FileInputStream(tsLocation), tsPwd);
tmFactory.init(ts);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(ksLocation), ksPwd);
kmf.init(ks, ksPwd);
SSLContext serverContext = SSLContext.getInstance("TLS");
serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
SSLEngine serverSslEngine = serverContext.createSSLEngine();
serverSslEngine.setNeedClientAuth(true);
serverSslEngine.setUseClientMode(false);
serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
serverSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
KeyStore ts = KeyStore.getInstance("JKS");
ts.load(new FileInputStream(tsLocation), tsPwd);
tmFactory.init(ts);
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(ksLocation), ksPwd);
kmf.init(ks, ksPwd);
SSLContext clientContext = SSLContext.getInstance("TLS");
clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
SSLEngine clientSslEngine = clientContext.createSSLEngine();
clientSslEngine.setUseClientMode(true);
clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
clientSslEngine.setEnableSessionCreation(true);
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
try {
dispatchLocally(message);
} catch (RejectedExecutionException e) {
log.warn("Unable to dispatch message due to {}", e.getMessage());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
log.error("Exception inside channel handling pipeline.", cause);
context.close();
}
}
private void dispatchLocally(InternalMessage message) throws IOException {
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
Callback callback =
callbacks.getIfPresent(message.id());
if (callback != null) {
callback.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
} finally {
callbacks.invalidate(message.id());
}
return;
}
Consumer<InternalMessage> handler = handlers.get(type);
if (handler != null) {
handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}
}
private final class Callback {
private final CompletableFuture<byte[]> future;
private final Executor executor;
public Callback(CompletableFuture<byte[]> future, Executor executor) {
this.future = future;
this.executor = executor;
}
public void complete(byte[] value) {
executor.execute(() -> future.complete(value));
}
public void completeExceptionally(Throwable error) {
executor.execute(() -> future.completeExceptionally(error));
}
}
private final class Connection {
private final CompletableFuture<Channel> internalFuture;
public Connection(CompletableFuture<Channel> internalFuture) {
this.internalFuture = internalFuture;
}
/**
* Sends a message out on its channel and associated the message with a
* completable future used for signaling.
* @param message the message to be sent
* @param future a future that is completed normally or exceptionally if
* message sending succeeds or fails respectively
*/
public void send(Object message, CompletableFuture<Void> future) {
internalFuture.whenComplete((channel, throwable) -> {
if (throwable == null) {
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
future.completeExceptionally(channelFuture.cause());
} else {
future.complete(null);
}
});
} else {
future.completeExceptionally(throwable);
}
});
}
/**
* Destroys a channel by closing its channel (if it exists) and
* cancelling its future.
*/
public void destroy() {
Channel channel = internalFuture.getNow(null);
if (channel != null) {
channel.close();
}
internalFuture.cancel(false);
}
/**
* Determines whether the connection is valid meaning it is either
* complete with and active channel
* or it has not yet completed.
* @return true if the channel has an active connection or has not
* yet completed
*/
public boolean validate() {
if (internalFuture.isCompletedExceptionally()) {
return false;
}
Channel channel = internalFuture.getNow(null);
return channel == null || channel.isActive();
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Asynchronous messaging APIs implemented using the Netty framework.
*/
package org.onlab.netty;
......@@ -34,7 +34,6 @@
<modules>
<module>junit</module>
<module>misc</module>
<module>netty</module>
<module>nio</module>
<module>yangutils</module>
<module>osgi</module>
......