Madan Jampani

ONOS-1983: Migrating all copycat Raft protocol specific communication to use ONO…

…S cluster communication primitives

Change-Id: I3f07266e50106b1adc13f722c647686c2b42ef7f
...@@ -164,7 +164,7 @@ public interface ClusterCommunicationService { ...@@ -164,7 +164,7 @@ public interface ClusterCommunicationService {
164 * 164 *
165 * @param subject message subject 165 * @param subject message subject
166 * @param decoder decoder for resurrecting incoming message 166 * @param decoder decoder for resurrecting incoming message
167 - * @param handler handler function that process the incoming message and produces a reply 167 + * @param handler handler function that processes the incoming message and produces a reply
168 * @param encoder encoder for serializing reply 168 * @param encoder encoder for serializing reply
169 * @param executor executor to run this handler on 169 * @param executor executor to run this handler on
170 * @param <M> incoming message type 170 * @param <M> incoming message type
...@@ -180,6 +180,21 @@ public interface ClusterCommunicationService { ...@@ -180,6 +180,21 @@ public interface ClusterCommunicationService {
180 * Adds a new subscriber for the specified message subject. 180 * Adds a new subscriber for the specified message subject.
181 * 181 *
182 * @param subject message subject 182 * @param subject message subject
183 + * @param decoder decoder for resurrecting incoming message
184 + * @param handler handler function that processes the incoming message and produces a reply
185 + * @param encoder encoder for serializing reply
186 + * @param <M> incoming message type
187 + * @param <R> reply message type
188 + */
189 + <M, R> void addSubscriber(MessageSubject subject,
190 + Function<byte[], M> decoder,
191 + Function<M, CompletableFuture<R>> handler,
192 + Function<R, byte[]> encoder);
193 +
194 + /**
195 + * Adds a new subscriber for the specified message subject.
196 + *
197 + * @param subject message subject
183 * @param decoder decoder to resurrecting incoming message 198 * @param decoder decoder to resurrecting incoming message
184 * @param handler handler for handling message 199 * @param handler handler for handling message
185 * @param executor executor to run this handler on 200 * @param executor executor to run this handler on
......
...@@ -62,6 +62,13 @@ public interface MessagingService { ...@@ -62,6 +62,13 @@ public interface MessagingService {
62 void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor); 62 void registerHandler(String type, Function<byte[], byte[]> handler, Executor executor);
63 63
64 /** 64 /**
65 + * Registers a new message handler for message type.
66 + * @param type message type.
67 + * @param handler message handler
68 + */
69 + void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler);
70 +
71 + /**
65 * Unregister current handler, if one exists for message type. 72 * Unregister current handler, if one exists for message type.
66 * @param type message type 73 * @param type message type
67 */ 74 */
......
...@@ -21,6 +21,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -21,6 +21,7 @@ import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
24 +import org.onlab.util.Tools;
24 import org.onosproject.cluster.ClusterService; 25 import org.onosproject.cluster.ClusterService;
25 import org.onosproject.cluster.ControllerNode; 26 import org.onosproject.cluster.ControllerNode;
26 import org.onosproject.cluster.NodeId; 27 import org.onosproject.cluster.NodeId;
...@@ -182,11 +183,15 @@ public class ClusterCommunicationManager ...@@ -182,11 +183,15 @@ public class ClusterCommunicationManager
182 Function<M, byte[]> encoder, 183 Function<M, byte[]> encoder,
183 Function<byte[], R> decoder, 184 Function<byte[], R> decoder,
184 NodeId toNodeId) { 185 NodeId toNodeId) {
186 + try {
185 ClusterMessage envelope = new ClusterMessage( 187 ClusterMessage envelope = new ClusterMessage(
186 clusterService.getLocalNode().id(), 188 clusterService.getLocalNode().id(),
187 subject, 189 subject,
188 encoder.apply(message)); 190 encoder.apply(message));
189 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder); 191 return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
192 + } catch (Exception e) {
193 + return Tools.exceptionalFuture(e);
194 + }
190 } 195 }
191 196
192 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) { 197 private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
...@@ -223,7 +228,6 @@ public class ClusterCommunicationManager ...@@ -223,7 +228,6 @@ public class ClusterCommunicationManager
223 messagingService.unregisterHandler(subject.value()); 228 messagingService.unregisterHandler(subject.value());
224 } 229 }
225 230
226 -
227 @Override 231 @Override
228 public <M, R> void addSubscriber(MessageSubject subject, 232 public <M, R> void addSubscriber(MessageSubject subject,
229 Function<byte[], M> decoder, 233 Function<byte[], M> decoder,
...@@ -231,8 +235,26 @@ public class ClusterCommunicationManager ...@@ -231,8 +235,26 @@ public class ClusterCommunicationManager
231 Function<R, byte[]> encoder, 235 Function<R, byte[]> encoder,
232 Executor executor) { 236 Executor executor) {
233 messagingService.registerHandler(subject.value(), 237 messagingService.registerHandler(subject.value(),
234 - new InternalMessageResponder<>(decoder, encoder, handler), 238 + new InternalMessageResponder<M, R>(decoder, encoder, m -> {
235 - executor); 239 + CompletableFuture<R> responseFuture = new CompletableFuture<>();
240 + executor.execute(() -> {
241 + try {
242 + responseFuture.complete(handler.apply(m));
243 + } catch (Exception e) {
244 + responseFuture.completeExceptionally(e);
245 + }
246 + });
247 + return responseFuture;
248 + }));
249 + }
250 +
251 + @Override
252 + public <M, R> void addSubscriber(MessageSubject subject,
253 + Function<byte[], M> decoder,
254 + Function<M, CompletableFuture<R>> handler,
255 + Function<R, byte[]> encoder) {
256 + messagingService.registerHandler(subject.value(),
257 + new InternalMessageResponder<>(decoder, encoder, handler));
236 } 258 }
237 259
238 @Override 260 @Override
...@@ -260,23 +282,22 @@ public class ClusterCommunicationManager ...@@ -260,23 +282,22 @@ public class ClusterCommunicationManager
260 } 282 }
261 } 283 }
262 284
263 - private class InternalMessageResponder<M, R> implements Function<byte[], byte[]> { 285 + private class InternalMessageResponder<M, R> implements Function<byte[], CompletableFuture<byte[]>> {
264 private final Function<byte[], M> decoder; 286 private final Function<byte[], M> decoder;
265 private final Function<R, byte[]> encoder; 287 private final Function<R, byte[]> encoder;
266 - private final Function<M, R> handler; 288 + private final Function<M, CompletableFuture<R>> handler;
267 289
268 public InternalMessageResponder(Function<byte[], M> decoder, 290 public InternalMessageResponder(Function<byte[], M> decoder,
269 Function<R, byte[]> encoder, 291 Function<R, byte[]> encoder,
270 - Function<M, R> handler) { 292 + Function<M, CompletableFuture<R>> handler) {
271 this.decoder = decoder; 293 this.decoder = decoder;
272 this.encoder = encoder; 294 this.encoder = encoder;
273 this.handler = handler; 295 this.handler = handler;
274 } 296 }
275 297
276 @Override 298 @Override
277 - public byte[] apply(byte[] bytes) { 299 + public CompletableFuture<byte[]> apply(byte[] bytes) {
278 - R reply = handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())); 300 + return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
279 - return encoder.apply(reply);
280 } 301 }
281 } 302 }
282 303
......
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import java.net.URI;
4 +import java.nio.ByteBuffer;
5 +import java.util.concurrent.CompletableFuture;
6 +
7 +import org.onlab.util.Tools;
8 +import org.onosproject.cluster.ClusterService;
9 +import org.onosproject.cluster.ControllerNode;
10 +import org.onosproject.cluster.NodeId;
11 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
12 +import org.onosproject.store.cluster.messaging.MessageSubject;
13 +
14 +import net.kuujo.copycat.protocol.AbstractProtocol;
15 +import net.kuujo.copycat.protocol.ProtocolClient;
16 +import net.kuujo.copycat.protocol.ProtocolHandler;
17 +import net.kuujo.copycat.protocol.ProtocolServer;
18 +import net.kuujo.copycat.util.Configurable;
19 +
20 +/**
21 + * Protocol for Copycat communication that employs
22 + * {@code ClusterCommunicationService}.
23 + */
24 +public class CopycatCommunicationProtocol extends AbstractProtocol {
25 +
26 + private static final MessageSubject COPYCAT_MESSAGE_SUBJECT =
27 + new MessageSubject("onos-copycat-message");
28 +
29 + protected ClusterService clusterService;
30 + protected ClusterCommunicationService clusterCommunicator;
31 +
32 + public CopycatCommunicationProtocol(ClusterService clusterService,
33 + ClusterCommunicationService clusterCommunicator) {
34 + this.clusterService = clusterService;
35 + this.clusterCommunicator = clusterCommunicator;
36 + }
37 +
38 + @Override
39 + public Configurable copy() {
40 + return this;
41 + }
42 +
43 + @Override
44 + public ProtocolClient createClient(URI uri) {
45 + NodeId nodeId = uriToNodeId(uri);
46 + if (nodeId == null) {
47 + throw new IllegalStateException("Unknown peer " + uri);
48 + }
49 + return new Client(nodeId);
50 + }
51 +
52 + @Override
53 + public ProtocolServer createServer(URI uri) {
54 + return new Server();
55 + }
56 +
57 + private class Server implements ProtocolServer {
58 +
59 + @Override
60 + public void handler(ProtocolHandler handler) {
61 + if (handler == null) {
62 + clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
63 + } else {
64 + clusterCommunicator.addSubscriber(COPYCAT_MESSAGE_SUBJECT,
65 + ByteBuffer::wrap,
66 + handler,
67 + Tools::byteBuffertoArray);
68 + // FIXME: Tools::byteBuffertoArray involves a array copy.
69 + }
70 + }
71 +
72 + @Override
73 + public CompletableFuture<Void> listen() {
74 + return CompletableFuture.completedFuture(null);
75 + }
76 +
77 + @Override
78 + public CompletableFuture<Void> close() {
79 + clusterCommunicator.removeSubscriber(COPYCAT_MESSAGE_SUBJECT);
80 + return CompletableFuture.completedFuture(null);
81 + }
82 + }
83 +
84 + private class Client implements ProtocolClient {
85 + private final NodeId peer;
86 +
87 + public Client(NodeId peer) {
88 + this.peer = peer;
89 + }
90 +
91 + @Override
92 + public CompletableFuture<ByteBuffer> write(ByteBuffer request) {
93 + return clusterCommunicator.sendAndReceive(request,
94 + COPYCAT_MESSAGE_SUBJECT,
95 + Tools::byteBuffertoArray,
96 + ByteBuffer::wrap,
97 + peer);
98 + }
99 +
100 + @Override
101 + public CompletableFuture<Void> connect() {
102 + return CompletableFuture.completedFuture(null);
103 + }
104 +
105 + @Override
106 + public CompletableFuture<Void> close() {
107 + return CompletableFuture.completedFuture(null);
108 + }
109 + }
110 +
111 + private NodeId uriToNodeId(URI uri) {
112 + return clusterService.getNodes()
113 + .stream()
114 + .filter(node -> uri.getHost().equals(node.ip().toString()))
115 + .map(ControllerNode::id)
116 + .findAny()
117 + .orElse(null);
118 + }
119 +}
...@@ -107,7 +107,7 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -107,7 +107,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
107 protected ClusterCommunicationService clusterCommunicator; 107 protected ClusterCommunicationService clusterCommunicator;
108 108
109 protected String nodeToUri(NodeInfo node) { 109 protected String nodeToUri(NodeInfo node) {
110 - return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT); 110 + return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
111 } 111 }
112 112
113 @Activate 113 @Activate
...@@ -136,9 +136,10 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -136,9 +136,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
136 .toArray(String[]::new); 136 .toArray(String[]::new);
137 137
138 String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode())); 138 String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
139 + Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
139 140
140 ClusterConfig clusterConfig = new ClusterConfig() 141 ClusterConfig clusterConfig = new ClusterConfig()
141 - .withProtocol(newNettyProtocol()) 142 + .withProtocol(protocol)
142 .withElectionTimeout(electionTimeoutMillis(activeNodeUris)) 143 .withElectionTimeout(electionTimeoutMillis(activeNodeUris))
143 .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris)) 144 .withHeartbeatInterval(heartbeatTimeoutMillis(activeNodeUris))
144 .withMembers(activeNodeUris) 145 .withMembers(activeNodeUris)
...@@ -232,6 +233,7 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -232,6 +233,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
232 .collect(Collectors.toList()); 233 .collect(Collectors.toList());
233 } 234 }
234 235
236 + @SuppressWarnings("unused")
235 private Protocol newNettyProtocol() { 237 private Protocol newNettyProtocol() {
236 return new NettyTcpProtocol() 238 return new NettyTcpProtocol()
237 .withSsl(false) 239 .withSsl(false)
......
...@@ -785,6 +785,12 @@ public class EventuallyConsistentMapImplTest { ...@@ -785,6 +785,12 @@ public class EventuallyConsistentMapImplTest {
785 } 785 }
786 786
787 @Override 787 @Override
788 + public <M, R> void addSubscriber(MessageSubject subject,
789 + Function<byte[], M> decoder, Function<M, CompletableFuture<R>> handler,
790 + Function<R, byte[]> encoder) {
791 + }
792 +
793 + @Override
788 public <M> void addSubscriber(MessageSubject subject, 794 public <M> void addSubscriber(MessageSubject subject,
789 Function<byte[], M> decoder, Consumer<M> handler, 795 Function<byte[], M> decoder, Consumer<M> handler,
790 Executor executor) { 796 Executor executor) {
......
...@@ -22,7 +22,7 @@ def get_OC_vars(): ...@@ -22,7 +22,7 @@ def get_OC_vars():
22 vars.append(var) 22 vars.append(var)
23 return sorted(vars, key=alphanum_key) 23 return sorted(vars, key=alphanum_key)
24 24
25 -def get_nodes(vars, port=7238): 25 +def get_nodes(vars, port=9876):
26 node = lambda k: { 'id': k, 'ip': k, 'tcpPort': port } 26 node = lambda k: { 'id': k, 'ip': k, 'tcpPort': port }
27 return [ node(environ[v]) for v in vars ] 27 return [ node(environ[v]) for v in vars ]
28 28
......
...@@ -25,6 +25,7 @@ import java.io.File; ...@@ -25,6 +25,7 @@ import java.io.File;
25 import java.io.FileInputStream; 25 import java.io.FileInputStream;
26 import java.io.IOException; 26 import java.io.IOException;
27 import java.io.InputStreamReader; 27 import java.io.InputStreamReader;
28 +import java.nio.ByteBuffer;
28 import java.nio.charset.StandardCharsets; 29 import java.nio.charset.StandardCharsets;
29 import java.nio.file.FileVisitResult; 30 import java.nio.file.FileVisitResult;
30 import java.nio.file.Files; 31 import java.nio.file.Files;
...@@ -34,9 +35,11 @@ import java.nio.file.SimpleFileVisitor; ...@@ -34,9 +35,11 @@ import java.nio.file.SimpleFileVisitor;
34 import java.nio.file.StandardCopyOption; 35 import java.nio.file.StandardCopyOption;
35 import java.nio.file.attribute.BasicFileAttributes; 36 import java.nio.file.attribute.BasicFileAttributes;
36 import java.util.ArrayList; 37 import java.util.ArrayList;
38 +import java.util.Arrays;
37 import java.util.Collection; 39 import java.util.Collection;
38 import java.util.Dictionary; 40 import java.util.Dictionary;
39 import java.util.List; 41 import java.util.List;
42 +import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.ExecutionException; 43 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.Future; 44 import java.util.concurrent.Future;
42 import java.util.concurrent.ThreadFactory; 45 import java.util.concurrent.ThreadFactory;
...@@ -388,6 +391,37 @@ public abstract class Tools { ...@@ -388,6 +391,37 @@ public abstract class Tools {
388 } 391 }
389 } 392 }
390 393
394 + /**
395 + * Returns a future that is completed exceptionally.
396 + * @param t exception
397 + * @param <T> future value type
398 + * @return future
399 + */
400 + public static <T> CompletableFuture<T> exceptionalFuture(Throwable t) {
401 + CompletableFuture<T> future = new CompletableFuture<>();
402 + future.completeExceptionally(t);
403 + return future;
404 + }
405 +
406 + /**
407 + * Returns the contents of {@code ByteBuffer} as byte array.
408 + * <p>
409 + * WARNING: There is a performance cost due to array copy
410 + * when using this method.
411 + * @param buffer byte buffer
412 + * @return byte array containing the byte buffer contents
413 + */
414 + public static byte[] byteBuffertoArray(ByteBuffer buffer) {
415 + int length = buffer.remaining();
416 + if (buffer.hasArray()) {
417 + int offset = buffer.arrayOffset() + buffer.position();
418 + return Arrays.copyOfRange(buffer.array(), offset, offset + length);
419 + }
420 + byte[] bytes = new byte[length];
421 + buffer.duplicate().get(bytes);
422 + return bytes;
423 + }
424 +
391 // Auxiliary path visitor for recursive directory structure copying. 425 // Auxiliary path visitor for recursive directory structure copying.
392 private static class DirectoryCopier extends SimpleFileVisitor<Path> { 426 private static class DirectoryCopier extends SimpleFileVisitor<Path> {
393 private Path src; 427 private Path src;
......
...@@ -203,6 +203,25 @@ public class NettyMessaging implements MessagingService { ...@@ -203,6 +203,25 @@ public class NettyMessaging implements MessagingService {
203 } 203 }
204 204
205 @Override 205 @Override
206 + public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
207 + handlers.put(type, message -> {
208 + handler.apply(message.payload()).whenComplete((result, error) -> {
209 + if (error == null) {
210 + InternalMessage response = new InternalMessage(message.id(),
211 + localEp,
212 + REPLY_MESSAGE_TYPE,
213 + result);
214 + try {
215 + sendAsync(message.sender(), response);
216 + } catch (IOException e) {
217 + log.debug("Failed to respond", e);
218 + }
219 + }
220 + });
221 + });
222 + }
223 +
224 + @Override
206 public void unregisterHandler(String type) { 225 public void unregisterHandler(String type) {
207 handlers.remove(type); 226 handlers.remove(type);
208 } 227 }
......
...@@ -212,6 +212,23 @@ public class IOLoopMessaging implements MessagingService { ...@@ -212,6 +212,23 @@ public class IOLoopMessaging implements MessagingService {
212 } 212 }
213 213
214 @Override 214 @Override
215 + public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
216 + handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
217 + if (error == null) {
218 + DefaultMessage response = new DefaultMessage(message.id(),
219 + localEp,
220 + REPLY_MESSAGE_TYPE,
221 + result);
222 + try {
223 + sendAsync(message.sender(), response);
224 + } catch (IOException e) {
225 + log.debug("Failed to respond", e);
226 + }
227 + }
228 + }));
229 + }
230 +
231 + @Override
215 public void unregisterHandler(String type) { 232 public void unregisterHandler(String type) {
216 handlers.remove(type); 233 handlers.remove(type);
217 } 234 }
......