pankaj

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 21 changed files with 61 additions and 879 deletions
...@@ -33,6 +33,12 @@ ...@@ -33,6 +33,12 @@
33 <artifactId>onlab-nio</artifactId> 33 <artifactId>onlab-nio</artifactId>
34 <version>${project.version}</version> 34 <version>${project.version}</version>
35 </dependency> 35 </dependency>
36 +
37 + <dependency>
38 + <groupId>org.onlab.onos</groupId>
39 + <artifactId>onlab-netty</artifactId>
40 + <version>${project.version}</version>
41 + </dependency>
36 42
37 <dependency> 43 <dependency>
38 <groupId>com.fasterxml.jackson.core</groupId> 44 <groupId>com.fasterxml.jackson.core</groupId>
...@@ -51,15 +57,6 @@ ...@@ -51,15 +57,6 @@
51 <groupId>de.javakaffee</groupId> 57 <groupId>de.javakaffee</groupId>
52 <artifactId>kryo-serializers</artifactId> 58 <artifactId>kryo-serializers</artifactId>
53 </dependency> 59 </dependency>
54 - <dependency>
55 - <groupId>io.netty</groupId>
56 - <artifactId>netty-all</artifactId>
57 - </dependency>
58 - <dependency>
59 - <groupId>commons-pool</groupId>
60 - <artifactId>commons-pool</artifactId>
61 - <version>1.6</version>
62 - </dependency>
63 </dependencies> 60 </dependencies>
64 61
65 <build> 62 <build>
......
...@@ -23,10 +23,10 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; ...@@ -23,10 +23,10 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 23 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 24 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25 import org.onlab.onos.store.cluster.messaging.MessageSubject; 25 import org.onlab.onos.store.cluster.messaging.MessageSubject;
26 -import org.onlab.onos.store.messaging.Endpoint; 26 +import org.onlab.netty.Endpoint;
27 -import org.onlab.onos.store.messaging.Message; 27 +import org.onlab.netty.Message;
28 -import org.onlab.onos.store.messaging.MessageHandler; 28 +import org.onlab.netty.MessageHandler;
29 -import org.onlab.onos.store.messaging.MessagingService; 29 +import org.onlab.netty.MessagingService;
30 import org.slf4j.Logger; 30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory; 31 import org.slf4j.LoggerFactory;
32 32
......
1 -package org.onlab.onos.store.messaging;
2 -
3 -/**
4 - * Representation of a TCP/UDP communication end point.
5 - */
6 -public class Endpoint {
7 -
8 - private final int port;
9 - private final String host;
10 -
11 - public Endpoint(String host, int port) {
12 - this.host = host;
13 - this.port = port;
14 - }
15 -
16 - public String host() {
17 - return host;
18 - }
19 -
20 - public int port() {
21 - return port;
22 - }
23 -
24 - @Override
25 - public String toString() {
26 - return "Endpoint [port=" + port + ", host=" + host + "]";
27 - }
28 -
29 - @Override
30 - public int hashCode() {
31 - final int prime = 31;
32 - int result = 1;
33 - result = prime * result + ((host == null) ? 0 : host.hashCode());
34 - result = prime * result + port;
35 - return result;
36 - }
37 -
38 - @Override
39 - public boolean equals(Object obj) {
40 - if (this == obj) {
41 - return true;
42 - }
43 - if (obj == null) {
44 - return false;
45 - }
46 - if (getClass() != obj.getClass()) {
47 - return false;
48 - }
49 - Endpoint other = (Endpoint) obj;
50 - if (host == null) {
51 - if (other.host != null) {
52 - return false;
53 - }
54 - } else if (!host.equals(other.host)) {
55 - return false;
56 - }
57 - if (port != other.port) {
58 - return false;
59 - }
60 - return true;
61 - }
62 -}
1 -package org.onlab.onos.store.messaging;
2 -
3 -import java.io.IOException;
4 -
5 -/**
6 - * A unit of communication.
7 - * Has a payload. Also supports a feature to respond back to the sender.
8 - */
9 -public interface Message {
10 -
11 - /**
12 - * Returns the payload of this message.
13 - * @return message payload.
14 - */
15 - public Object payload();
16 -
17 - /**
18 - * Sends a reply back to the sender of this messge.
19 - * @param data payload of the response.
20 - * @throws IOException if there is a communication error.
21 - */
22 - public void respond(Object data) throws IOException;
23 -}
1 -package org.onlab.onos.store.messaging;
2 -
3 -import java.io.IOException;
4 -
5 -/**
6 - * Handler for a message.
7 - */
8 -public interface MessageHandler {
9 -
10 - /**
11 - * Handles the message.
12 - * @param message message.
13 - * @throws IOException.
14 - */
15 - public void handle(Message message) throws IOException;
16 -}
1 -package org.onlab.onos.store.messaging;
2 -
3 -import java.io.IOException;
4 -
5 -/**
6 - * Interface for low level messaging primitives.
7 - */
8 -public interface MessagingService {
9 - /**
10 - * Sends a message asynchronously to the specified communication end point.
11 - * The message is specified using the type and payload.
12 - * @param ep end point to send the message to.
13 - * @param type type of message.
14 - * @param payload message payload.
15 - * @throws IOException
16 - */
17 - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
18 -
19 - /**
20 - * Sends a message synchronously and waits for a response.
21 - * @param ep end point to send the message to.
22 - * @param type type of message.
23 - * @param payload message payload.
24 - * @return a response future
25 - * @throws IOException
26 - */
27 - public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
28 -
29 - /**
30 - * Registers a new message handler for message type.
31 - * @param type message type.
32 - * @param handler message handler
33 - */
34 - public void registerHandler(String type, MessageHandler handler);
35 -
36 - /**
37 - * Unregister current handler, if one exists for message type.
38 - * @param type message type
39 - */
40 - public void unregisterHandler(String type);
41 -}
1 -package org.onlab.onos.store.messaging;
2 -
3 -import java.util.concurrent.TimeUnit;
4 -import java.util.concurrent.TimeoutException;
5 -
6 -/**
7 - * Response object returned when making synchronous requests.
8 - * Can you used to check is a response is ready and/or wait for a response
9 - * to become available.
10 - *
11 - * @param <T> type of response.
12 - */
13 -public interface Response<T> {
14 -
15 - /**
16 - * Gets the response waiting for a designated timeout period.
17 - * @param timeout timeout period (since request was sent out)
18 - * @param tu unit of time.
19 - * @return response
20 - * @throws TimeoutException if the timeout expires before the response arrives.
21 - */
22 - public T get(long timeout, TimeUnit tu) throws TimeoutException;
23 -
24 - /**
25 - * Gets the response waiting for indefinite timeout period.
26 - * @return response
27 - * @throws InterruptedException if the thread is interrupted before the response arrives.
28 - */
29 - public T get() throws InterruptedException;
30 -
31 - /**
32 - * Checks if the response is ready without blocking.
33 - * @return true if response is ready, false otherwise.
34 - */
35 - public boolean isReady();
36 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import java.util.concurrent.TimeUnit;
4 -import java.util.concurrent.TimeoutException;
5 -
6 -import org.onlab.onos.store.messaging.Response;
7 -
8 -/**
9 - * An asynchronous response.
10 - * This class provides a base implementation of Response, with methods to retrieve the
11 - * result and query to see if the result is ready. The result can only be retrieved when
12 - * it is ready and the get methods will block if the result is not ready yet.
13 - * @param <T> type of response.
14 - */
15 -public class AsyncResponse<T> implements Response<T> {
16 -
17 - private T value;
18 - private boolean done = false;
19 - private final long start = System.nanoTime();
20 -
21 - @Override
22 - public T get(long timeout, TimeUnit tu) throws TimeoutException {
23 - timeout = tu.toNanos(timeout);
24 - boolean interrupted = false;
25 - try {
26 - synchronized (this) {
27 - while (!done) {
28 - try {
29 - long timeRemaining = timeout - (System.nanoTime() - start);
30 - if (timeRemaining <= 0) {
31 - throw new TimeoutException("Operation timed out.");
32 - }
33 - TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
34 - } catch (InterruptedException e) {
35 - interrupted = true;
36 - }
37 - }
38 - }
39 - } finally {
40 - if (interrupted) {
41 - Thread.currentThread().interrupt();
42 - }
43 - }
44 - return value;
45 - }
46 -
47 - @Override
48 - public T get() throws InterruptedException {
49 - throw new UnsupportedOperationException();
50 - }
51 -
52 - @Override
53 - public boolean isReady() {
54 - return done;
55 - }
56 -
57 - /**
58 - * Sets response value and unblocks any thread blocking on the response to become
59 - * available.
60 - * @param data response data.
61 - */
62 - @SuppressWarnings("unchecked")
63 - public synchronized void setResponse(Object data) {
64 - if (!done) {
65 - done = true;
66 - value = (T) data;
67 - this.notifyAll();
68 - }
69 - }
70 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import java.io.IOException;
4 -
5 -import org.onlab.onos.store.messaging.Message;
6 -import org.onlab.onos.store.messaging.MessageHandler;
7 -
8 -/**
9 - * Message handler that echos the message back to the sender.
10 - */
11 -public class EchoHandler implements MessageHandler {
12 -
13 - @Override
14 - public void handle(Message message) throws IOException {
15 - System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
16 - message.respond(message.payload());
17 - }
18 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import java.io.IOException;
4 -
5 -import org.onlab.onos.store.messaging.Endpoint;
6 -import org.onlab.onos.store.messaging.Message;
7 -
8 -/**
9 - * Internal message representation with additional attributes
10 - * for supporting, synchronous request/reply behavior.
11 - */
12 -public final class InternalMessage implements Message {
13 -
14 - private long id;
15 - private Endpoint sender;
16 - private String type;
17 - private Object payload;
18 - private transient NettyMessagingService messagingService;
19 - public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
20 -
21 - // Must be created using the Builder.
22 - private InternalMessage() {}
23 -
24 - public long id() {
25 - return id;
26 - }
27 -
28 - public String type() {
29 - return type;
30 - }
31 -
32 - public Endpoint sender() {
33 - return sender;
34 - }
35 -
36 - @Override
37 - public Object payload() {
38 - return payload;
39 - }
40 -
41 - @Override
42 - public void respond(Object data) throws IOException {
43 - Builder builder = new Builder(messagingService);
44 - InternalMessage message = builder.withId(this.id)
45 - // FIXME: Sender should be messagingService.localEp.
46 - .withSender(this.sender)
47 - .withPayload(data)
48 - .withType(REPLY_MESSAGE_TYPE)
49 - .build();
50 - messagingService.sendAsync(sender, message);
51 - }
52 -
53 -
54 - /**
55 - * Builder for InternalMessages.
56 - */
57 - public static class Builder {
58 - private InternalMessage message;
59 -
60 - public Builder(NettyMessagingService messagingService) {
61 - message = new InternalMessage();
62 - message.messagingService = messagingService;
63 - }
64 -
65 - public Builder withId(long id) {
66 - message.id = id;
67 - return this;
68 - }
69 -
70 - public Builder withType(String type) {
71 - message.type = type;
72 - return this;
73 - }
74 -
75 - public Builder withSender(Endpoint sender) {
76 - message.sender = sender;
77 - return this;
78 - }
79 - public Builder withPayload(Object payload) {
80 - message.payload = payload;
81 - return this;
82 - }
83 -
84 - public InternalMessage build() {
85 - return message;
86 - }
87 - }
88 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import org.onlab.util.KryoPool;
4 -import org.slf4j.Logger;
5 -import org.slf4j.LoggerFactory;
6 -
7 -import java.util.ArrayList;
8 -import java.util.HashMap;
9 -
10 -/**
11 - * Kryo Serializer.
12 - */
13 -public class KryoSerializer implements Serializer {
14 -
15 - private final Logger log = LoggerFactory.getLogger(getClass());
16 -
17 - private KryoPool serializerPool;
18 -
19 - public KryoSerializer() {
20 - setupKryoPool();
21 - }
22 -
23 - /**
24 - * Sets up the common serialzers pool.
25 - */
26 - protected void setupKryoPool() {
27 - // FIXME Slice out types used in common to separate pool/namespace.
28 - serializerPool = KryoPool.newBuilder()
29 - .register(ArrayList.class,
30 - HashMap.class,
31 - ArrayList.class
32 - )
33 - .build()
34 - .populate(1);
35 - }
36 -
37 -
38 - @Override
39 - public Object decode(byte[] data) {
40 - return serializerPool.deserialize(data);
41 - }
42 -
43 - @Override
44 - public byte[] encode(Object payload) {
45 - return serializerPool.serialize(payload);
46 - }
47 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import org.onlab.onos.store.messaging.Message;
4 -import org.onlab.onos.store.messaging.MessageHandler;
5 -
6 -/**
7 - * A MessageHandler that simply logs the information.
8 - */
9 -public class LoggingHandler implements MessageHandler {
10 -
11 - @Override
12 - public void handle(Message message) {
13 - System.out.println("Received: " + message.payload());
14 - }
15 -}
...\ No newline at end of file ...\ No newline at end of file
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import java.util.Arrays;
4 -import java.util.List;
5 -
6 -import static com.google.common.base.Preconditions.checkState;
7 -
8 -import org.onlab.onos.store.messaging.Endpoint;
9 -
10 -import io.netty.buffer.ByteBuf;
11 -import io.netty.channel.ChannelHandlerContext;
12 -import io.netty.handler.codec.ByteToMessageDecoder;
13 -
14 -/**
15 - * Decode bytes into a InrenalMessage.
16 - */
17 -public class MessageDecoder extends ByteToMessageDecoder {
18 -
19 - private final NettyMessagingService messagingService;
20 - private final Serializer serializer;
21 -
22 - public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
23 - this.messagingService = messagingService;
24 - this.serializer = serializer;
25 - }
26 -
27 - @Override
28 - protected void decode(ChannelHandlerContext context, ByteBuf in,
29 - List<Object> messages) throws Exception {
30 -
31 - byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
32 - checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
33 -
34 - // read message Id.
35 - long id = in.readLong();
36 -
37 - // read message type; first read size and then bytes.
38 - String type = new String(in.readBytes(in.readInt()).array());
39 -
40 - // read sender host name; first read size and then bytes.
41 - String host = new String(in.readBytes(in.readInt()).array());
42 -
43 - // read sender port.
44 - int port = in.readInt();
45 -
46 - Endpoint sender = new Endpoint(host, port);
47 -
48 - // read message payload; first read size and then bytes.
49 - Object payload = serializer.decode(in.readBytes(in.readInt()).array());
50 -
51 - InternalMessage message = new InternalMessage.Builder(messagingService)
52 - .withId(id)
53 - .withSender(sender)
54 - .withType(type)
55 - .withPayload(payload)
56 - .build();
57 -
58 - messages.add(message);
59 - }
60 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import io.netty.buffer.ByteBuf;
4 -import io.netty.channel.ChannelHandlerContext;
5 -import io.netty.handler.codec.MessageToByteEncoder;
6 -
7 -/**
8 - * Encode InternalMessage out into a byte buffer.
9 - */
10 -public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
11 -
12 - // onosiscool in ascii
13 - public static final byte[] PREAMBLE = "onosiscool".getBytes();
14 -
15 - private final Serializer serializer;
16 -
17 - public MessageEncoder(Serializer serializer) {
18 - this.serializer = serializer;
19 - }
20 -
21 - @Override
22 - protected void encode(ChannelHandlerContext context, InternalMessage message,
23 - ByteBuf out) throws Exception {
24 -
25 - // write preamble
26 - out.writeBytes(PREAMBLE);
27 -
28 - // write id
29 - out.writeLong(message.id());
30 -
31 - // write type length
32 - out.writeInt(message.type().length());
33 -
34 - // write type
35 - out.writeBytes(message.type().getBytes());
36 -
37 - // write sender host name size
38 - out.writeInt(message.sender().host().length());
39 -
40 - // write sender host name.
41 - out.writeBytes(message.sender().host().getBytes());
42 -
43 - // write port
44 - out.writeInt(message.sender().port());
45 -
46 - try {
47 - serializer.encode(message.payload());
48 - } catch (Exception e) {
49 - e.printStackTrace();
50 - }
51 -
52 - byte[] payload = serializer.encode(message.payload());
53 -
54 - // write payload length.
55 - out.writeInt(payload.length);
56 -
57 - // write payload bytes
58 - out.writeBytes(payload);
59 - }
60 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import java.io.IOException;
4 -import java.net.UnknownHostException;
5 -import java.util.concurrent.ConcurrentHashMap;
6 -import java.util.concurrent.ConcurrentMap;
7 -import java.util.concurrent.TimeUnit;
8 -
9 -import io.netty.bootstrap.Bootstrap;
10 -import io.netty.bootstrap.ServerBootstrap;
11 -import io.netty.buffer.PooledByteBufAllocator;
12 -import io.netty.channel.Channel;
13 -import io.netty.channel.ChannelFuture;
14 -import io.netty.channel.ChannelHandlerContext;
15 -import io.netty.channel.ChannelInitializer;
16 -import io.netty.channel.ChannelOption;
17 -import io.netty.channel.EventLoopGroup;
18 -import io.netty.channel.SimpleChannelInboundHandler;
19 -import io.netty.channel.nio.NioEventLoopGroup;
20 -import io.netty.channel.socket.SocketChannel;
21 -import io.netty.channel.socket.nio.NioServerSocketChannel;
22 -import io.netty.channel.socket.nio.NioSocketChannel;
23 -
24 -import org.apache.commons.lang.math.RandomUtils;
25 -import org.apache.commons.pool.KeyedObjectPool;
26 -import org.apache.commons.pool.KeyedPoolableObjectFactory;
27 -import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28 -import org.apache.felix.scr.annotations.Activate;
29 -import org.apache.felix.scr.annotations.Component;
30 -import org.apache.felix.scr.annotations.Deactivate;
31 -import org.apache.felix.scr.annotations.Reference;
32 -import org.apache.felix.scr.annotations.ReferenceCardinality;
33 -import org.apache.felix.scr.annotations.Service;
34 -import org.onlab.onos.store.messaging.Endpoint;
35 -import org.onlab.onos.store.messaging.MessageHandler;
36 -import org.onlab.onos.store.messaging.MessagingService;
37 -import org.onlab.onos.store.messaging.Response;
38 -import org.slf4j.Logger;
39 -import org.slf4j.LoggerFactory;
40 -
41 -import com.google.common.cache.Cache;
42 -import com.google.common.cache.CacheBuilder;
43 -
44 -/**
45 - * A Netty based implementation of MessagingService.
46 - */
47 -@Component(immediate = true)
48 -@Service
49 -public class NettyMessagingService implements MessagingService {
50 -
51 - private final Logger log = LoggerFactory.getLogger(getClass());
52 -
53 - private KeyedObjectPool<Endpoint, Channel> channels =
54 - new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
55 - private final int port;
56 - private final EventLoopGroup bossGroup = new NioEventLoopGroup();
57 - private final EventLoopGroup workerGroup = new NioEventLoopGroup();
58 - private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
59 - private Cache<Long, AsyncResponse<?>> responseFutures;
60 - private final Endpoint localEp;
61 -
62 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
63 - protected Serializer serializer;
64 -
65 - public NettyMessagingService() {
66 - // TODO: Default port should be configurable.
67 - this(8080);
68 - }
69 -
70 - // FIXME: Constructor should not throw exceptions.
71 - public NettyMessagingService(int port) {
72 - this.port = port;
73 - try {
74 - localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
75 - } catch (UnknownHostException e) {
76 - // bailing out.
77 - throw new RuntimeException(e);
78 - }
79 - }
80 -
81 - @Activate
82 - public void activate() throws Exception {
83 - responseFutures = CacheBuilder.newBuilder()
84 - .maximumSize(100000)
85 - .weakValues()
86 - // TODO: Once the entry expires, notify blocking threads (if any).
87 - .expireAfterWrite(10, TimeUnit.MINUTES)
88 - .build();
89 - startAcceptingConnections();
90 - }
91 -
92 - @Deactivate
93 - public void deactivate() throws Exception {
94 - channels.close();
95 - bossGroup.shutdownGracefully();
96 - workerGroup.shutdownGracefully();
97 - }
98 -
99 - @Override
100 - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
101 - InternalMessage message = new InternalMessage.Builder(this)
102 - .withId(RandomUtils.nextLong())
103 - .withSender(localEp)
104 - .withType(type)
105 - .withPayload(payload)
106 - .build();
107 - sendAsync(ep, message);
108 - }
109 -
110 - protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
111 - Channel channel = null;
112 - try {
113 - channel = channels.borrowObject(ep);
114 - channel.eventLoop().execute(new WriteTask(channel, message));
115 - } catch (Exception e) {
116 - throw new IOException(e);
117 - } finally {
118 - try {
119 - channels.returnObject(ep, channel);
120 - } catch (Exception e) {
121 - log.warn("Error returning object back to the pool", e);
122 - // ignored.
123 - }
124 - }
125 - }
126 -
127 - @Override
128 - public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
129 - throws IOException {
130 - AsyncResponse<T> futureResponse = new AsyncResponse<T>();
131 - Long messageId = RandomUtils.nextLong();
132 - responseFutures.put(messageId, futureResponse);
133 - InternalMessage message = new InternalMessage.Builder(this)
134 - .withId(messageId)
135 - .withSender(localEp)
136 - .withType(type)
137 - .withPayload(payload)
138 - .build();
139 - sendAsync(ep, message);
140 - return futureResponse;
141 - }
142 -
143 - @Override
144 - public void registerHandler(String type, MessageHandler handler) {
145 - // TODO: Is this the right semantics for handler registration?
146 - handlers.putIfAbsent(type, handler);
147 - }
148 -
149 - public void unregisterHandler(String type) {
150 - handlers.remove(type);
151 - }
152 -
153 - private MessageHandler getMessageHandler(String type) {
154 - return handlers.get(type);
155 - }
156 -
157 - private void startAcceptingConnections() throws InterruptedException {
158 - ServerBootstrap b = new ServerBootstrap();
159 - b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
160 - b.group(bossGroup, workerGroup)
161 - .channel(NioServerSocketChannel.class)
162 - .childHandler(new OnosCommunicationChannelInitializer())
163 - .option(ChannelOption.SO_BACKLOG, 128)
164 - .childOption(ChannelOption.SO_KEEPALIVE, true);
165 -
166 - // Bind and start to accept incoming connections.
167 - b.bind(port).sync();
168 - }
169 -
170 - private class OnosCommunicationChannelFactory
171 - implements KeyedPoolableObjectFactory<Endpoint, Channel> {
172 -
173 - @Override
174 - public void activateObject(Endpoint endpoint, Channel channel)
175 - throws Exception {
176 - }
177 -
178 - @Override
179 - public void destroyObject(Endpoint ep, Channel channel) throws Exception {
180 - channel.close();
181 - }
182 -
183 - @Override
184 - public Channel makeObject(Endpoint ep) throws Exception {
185 - Bootstrap b = new Bootstrap();
186 - b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
187 - b.group(workerGroup);
188 - // TODO: Make this faster:
189 - // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
190 - b.channel(NioSocketChannel.class);
191 - b.option(ChannelOption.SO_KEEPALIVE, true);
192 - b.handler(new OnosCommunicationChannelInitializer());
193 -
194 - // Start the client.
195 - ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
196 - return f.channel();
197 - }
198 -
199 - @Override
200 - public void passivateObject(Endpoint ep, Channel channel)
201 - throws Exception {
202 - }
203 -
204 - @Override
205 - public boolean validateObject(Endpoint ep, Channel channel) {
206 - return channel.isOpen();
207 - }
208 - }
209 -
210 - private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
211 -
212 - @Override
213 - protected void initChannel(SocketChannel channel) throws Exception {
214 - channel.pipeline()
215 - .addLast(new MessageEncoder(serializer))
216 - .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
217 - .addLast(new NettyMessagingService.InboundMessageDispatcher());
218 - }
219 - }
220 -
221 - private class WriteTask implements Runnable {
222 -
223 - private final Object message;
224 - private final Channel channel;
225 -
226 - public WriteTask(Channel channel, Object message) {
227 - this.message = message;
228 - this.channel = channel;
229 - }
230 -
231 - @Override
232 - public void run() {
233 - channel.writeAndFlush(message);
234 - }
235 - }
236 -
237 - private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
238 -
239 - @Override
240 - protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
241 - String type = message.type();
242 - if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
243 - try {
244 - AsyncResponse<?> futureResponse =
245 - NettyMessagingService.this.responseFutures.getIfPresent(message.id());
246 - if (futureResponse != null) {
247 - futureResponse.setResponse(message.payload());
248 - }
249 - log.warn("Received a reply. But was unable to locate the request handle");
250 - } finally {
251 - NettyMessagingService.this.responseFutures.invalidate(message.id());
252 - }
253 - return;
254 - }
255 - MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
256 - handler.handle(message);
257 - }
258 - }
259 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -/**
4 - * Interface for encoding/decoding message payloads.
5 - */
6 -public interface Serializer {
7 -
8 - /**
9 - * Decodes the specified byte array to a POJO.
10 - *
11 - * @param data byte array.
12 - * @return POJO
13 - */
14 - Object decode(byte[] data);
15 -
16 - /**
17 - * Encodes the specified POJO into a byte array.
18 - *
19 - * @param data POJO to be encoded
20 - * @return byte array.
21 - */
22 - byte[] encode(Object message);
23 -
24 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -import java.util.concurrent.TimeUnit;
4 -
5 -import org.onlab.onos.store.messaging.Endpoint;
6 -import org.onlab.onos.store.messaging.Response;
7 -
8 -public final class SimpleClient {
9 - private SimpleClient() {}
10 -
11 - public static void main(String... args) throws Exception {
12 - NettyMessagingService messaging = new TestNettyMessagingService(9081);
13 - messaging.activate();
14 -
15 - messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
16 - Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World");
17 - System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
18 - }
19 -
20 - public static class TestNettyMessagingService extends NettyMessagingService {
21 - public TestNettyMessagingService(int port) throws Exception {
22 - super(port);
23 - Serializer serializer = new KryoSerializer();
24 - this.serializer = serializer;
25 - }
26 - }
27 -}
1 -package org.onlab.onos.store.messaging.impl;
2 -
3 -public final class SimpleServer {
4 - private SimpleServer() {}
5 -
6 - public static void main(String... args) throws Exception {
7 - NettyMessagingService server = new TestNettyMessagingService();
8 - server.activate();
9 - server.registerHandler("simple", new LoggingHandler());
10 - server.registerHandler("echo", new EchoHandler());
11 - }
12 -
13 - public static class TestNettyMessagingService extends NettyMessagingService {
14 - protected TestNettyMessagingService() {
15 - Serializer serializer = new KryoSerializer();
16 - this.serializer = serializer;
17 - }
18 - }
19 -}
...@@ -7,7 +7,7 @@ import org.junit.Test; ...@@ -7,7 +7,7 @@ import org.junit.Test;
7 import org.onlab.onos.cluster.DefaultControllerNode; 7 import org.onlab.onos.cluster.DefaultControllerNode;
8 import org.onlab.onos.cluster.NodeId; 8 import org.onlab.onos.cluster.NodeId;
9 import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager; 9 import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
10 -import org.onlab.onos.store.messaging.impl.NettyMessagingService; 10 +import org.onlab.netty.NettyMessagingService;
11 import org.onlab.packet.IpPrefix; 11 import org.onlab.packet.IpPrefix;
12 12
13 import java.util.concurrent.CountDownLatch; 13 import java.util.concurrent.CountDownLatch;
......
1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<project xmlns="http://maven.apache.org/POM/4.0.0"
3 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
5 + <modelVersion>4.0.0</modelVersion>
6 +
7 + <parent>
8 + <groupId>org.onlab.onos</groupId>
9 + <artifactId>onlab-utils</artifactId>
10 + <version>1.0.0-SNAPSHOT</version>
11 + <relativePath>../pom.xml</relativePath>
12 + </parent>
13 +
14 + <artifactId>onlab-netty</artifactId>
15 + <packaging>bundle</packaging>
16 +
17 + <description>Network I/O using Netty framework</description>
18 +
19 + <dependencies>
20 + <dependency>
21 + <groupId>com.google.guava</groupId>
22 + <artifactId>guava-testlib</artifactId>
23 + <scope>test</scope>
24 + </dependency>
25 + <dependency>
26 + <groupId>org.onlab.onos</groupId>
27 + <artifactId>onlab-misc</artifactId>
28 + </dependency>
29 + <dependency>
30 + <groupId>org.onlab.onos</groupId>
31 + <artifactId>onlab-junit</artifactId>
32 + <scope>test</scope>
33 + </dependency>
34 + <dependency>
35 + <groupId>de.javakaffee</groupId>
36 + <artifactId>kryo-serializers</artifactId>
37 + </dependency>
38 + <dependency>
39 + <groupId>io.netty</groupId>
40 + <artifactId>netty-all</artifactId>
41 + </dependency>
42 + <dependency>
43 + <groupId>commons-pool</groupId>
44 + <artifactId>commons-pool</artifactId>
45 + <version>1.6</version>
46 + </dependency>
47 + </dependencies>
48 +
49 +</project>
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
19 <modules> 19 <modules>
20 <module>junit</module> 20 <module>junit</module>
21 <module>misc</module> 21 <module>misc</module>
22 + <module>netty</module>
22 <module>nio</module> 23 <module>nio</module>
23 <module>osgi</module> 24 <module>osgi</module>
24 <module>rest</module> 25 <module>rest</module>
......