tom

Added ability to measure round-trip latency and to assure message integrity.

...@@ -31,6 +31,16 @@ ...@@ -31,6 +31,16 @@
31 <groupId>org.livetribe.slp</groupId> 31 <groupId>org.livetribe.slp</groupId>
32 <artifactId>livetribe-slp</artifactId> 32 <artifactId>livetribe-slp</artifactId>
33 </dependency> 33 </dependency>
34 +
35 + <dependency>
36 + <groupId>com.fasterxml.jackson.core</groupId>
37 + <artifactId>jackson-databind</artifactId>
38 + </dependency>
39 + <dependency>
40 + <groupId>com.fasterxml.jackson.core</groupId>
41 + <artifactId>jackson-annotations</artifactId>
42 + </dependency>
43 +
34 <dependency> 44 <dependency>
35 <groupId>org.apache.karaf.shell</groupId> 45 <groupId>org.apache.karaf.shell</groupId>
36 <artifactId>org.apache.karaf.shell.console</artifactId> 46 <artifactId>org.apache.karaf.shell.console</artifactId>
......
1 +package org.onlab.onos.ccc;
2 +
3 +import com.fasterxml.jackson.core.JsonEncoding;
4 +import com.fasterxml.jackson.core.JsonFactory;
5 +import com.fasterxml.jackson.databind.JsonNode;
6 +import com.fasterxml.jackson.databind.ObjectMapper;
7 +import com.fasterxml.jackson.databind.node.ArrayNode;
8 +import com.fasterxml.jackson.databind.node.ObjectNode;
9 +import org.onlab.onos.cluster.DefaultControllerNode;
10 +import org.onlab.onos.cluster.NodeId;
11 +import org.onlab.packet.IpPrefix;
12 +
13 +import java.io.File;
14 +import java.io.IOException;
15 +import java.util.HashSet;
16 +import java.util.Iterator;
17 +import java.util.Set;
18 +
19 +/**
20 + * Allows for reading and writing cluster definition as a JSON file.
21 + */
22 +public class ClusterDefinitionStore {
23 +
24 + private final File file;
25 +
26 + /**
27 + * Creates a reader/writer of the cluster definition file.
28 + *
29 + * @param filePath location of the definition file
30 + */
31 + public ClusterDefinitionStore(String filePath) {
32 + file = new File(filePath);
33 + }
34 +
35 + /**
36 + * Returns set of the controller nodes, including self.
37 + *
38 + * @return set of controller nodes
39 + */
40 + public Set<DefaultControllerNode> read() throws IOException {
41 + Set<DefaultControllerNode> nodes = new HashSet<>();
42 + ObjectMapper mapper = new ObjectMapper();
43 + ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
44 + Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
45 + while (it.hasNext()) {
46 + ObjectNode nodeDef = (ObjectNode) it.next();
47 + nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
48 + IpPrefix.valueOf(nodeDef.get("ip").asText()),
49 + nodeDef.get("tcpPort").asInt(9876)));
50 + }
51 + return nodes;
52 + }
53 +
54 + /**
55 + * Writes the given set of the controller nodes.
56 + *
57 + * @param nodes set of controller nodes
58 + */
59 + public void write(Set<DefaultControllerNode> nodes) throws IOException {
60 + ObjectMapper mapper = new ObjectMapper();
61 + ObjectNode clusterNodeDef = mapper.createObjectNode();
62 + ArrayNode nodeDefs = mapper.createArrayNode();
63 + clusterNodeDef.set("nodes", nodeDefs);
64 + for (DefaultControllerNode node : nodes) {
65 + ObjectNode nodeDef = mapper.createObjectNode();
66 + nodeDef.put("id", node.id().toString())
67 + .put("ip", node.ip().toString())
68 + .put("tcpPort", node.tcpPort());
69 + nodeDefs.add(nodeDef);
70 + }
71 + mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
72 + clusterNodeDef);
73 + }
74 +
75 +}
...@@ -21,12 +21,17 @@ import org.slf4j.LoggerFactory; ...@@ -21,12 +21,17 @@ import org.slf4j.LoggerFactory;
21 21
22 import java.io.IOException; 22 import java.io.IOException;
23 import java.net.InetSocketAddress; 23 import java.net.InetSocketAddress;
24 +import java.net.Socket;
25 +import java.net.SocketAddress;
24 import java.nio.channels.ByteChannel; 26 import java.nio.channels.ByteChannel;
25 import java.nio.channels.ServerSocketChannel; 27 import java.nio.channels.ServerSocketChannel;
28 +import java.nio.channels.SocketChannel;
26 import java.util.ArrayList; 29 import java.util.ArrayList;
27 import java.util.List; 30 import java.util.List;
28 import java.util.Map; 31 import java.util.Map;
29 import java.util.Set; 32 import java.util.Set;
33 +import java.util.Timer;
34 +import java.util.TimerTask;
30 import java.util.concurrent.ConcurrentHashMap; 35 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService; 36 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors; 37 import java.util.concurrent.Executors;
...@@ -47,21 +52,36 @@ public class DistributedClusterStore ...@@ -47,21 +52,36 @@ public class DistributedClusterStore
47 52
48 private final Logger log = LoggerFactory.getLogger(getClass()); 53 private final Logger log = LoggerFactory.getLogger(getClass());
49 54
55 + private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
56 + private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
57 +
50 private static final long SELECT_TIMEOUT = 50; 58 private static final long SELECT_TIMEOUT = 50;
51 private static final int WORKERS = 3; 59 private static final int WORKERS = 3;
60 + private static final int INITIATORS = 2;
52 private static final int COMM_BUFFER_SIZE = 16 * 1024; 61 private static final int COMM_BUFFER_SIZE = 16 * 1024;
53 private static final int COMM_IDLE_TIME = 500; 62 private static final int COMM_IDLE_TIME = 500;
54 63
64 + private static final boolean SO_NO_DELAY = false;
65 + private static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
66 + private static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
67 +
55 private DefaultControllerNode self; 68 private DefaultControllerNode self;
56 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); 69 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
57 private final Map<NodeId, State> states = new ConcurrentHashMap<>(); 70 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
71 + private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
72 + private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
58 73
59 private final ExecutorService listenExecutor = 74 private final ExecutorService listenExecutor =
60 - Executors.newSingleThreadExecutor(namedThreads("onos-listen")); 75 + Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
61 private final ExecutorService commExecutors = 76 private final ExecutorService commExecutors =
62 - Executors.newFixedThreadPool(WORKERS, namedThreads("onos-cluster")); 77 + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
63 private final ExecutorService heartbeatExecutor = 78 private final ExecutorService heartbeatExecutor =
64 - Executors.newSingleThreadExecutor(namedThreads("onos-heartbeat")); 79 + Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
80 + private final ExecutorService initiatorExecutors =
81 + Executors.newFixedThreadPool(INITIATORS, namedThreads("onos-comm-initiator"));
82 +
83 + private final Timer timer = new Timer();
84 + private final TimerTask connectionCustodian = new ConnectionCustodian();
65 85
66 private ListenLoop listenLoop; 86 private ListenLoop listenLoop;
67 private List<CommLoop> commLoops = new ArrayList<>(WORKERS); 87 private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
...@@ -71,9 +91,28 @@ public class DistributedClusterStore ...@@ -71,9 +91,28 @@ public class DistributedClusterStore
71 establishIdentity(); 91 establishIdentity();
72 startCommunications(); 92 startCommunications();
73 startListening(); 93 startListening();
94 + startInitiating();
74 log.info("Started"); 95 log.info("Started");
75 } 96 }
76 97
98 + @Deactivate
99 + public void deactivate() {
100 + listenLoop.shutdown();
101 + for (CommLoop loop : commLoops) {
102 + loop.shutdown();
103 + }
104 + log.info("Stopped");
105 + }
106 +
107 +
108 + // Establishes the controller's own identity.
109 + private void establishIdentity() {
110 + IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
111 + self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
112 + nodes.put(self.id(), self);
113 + }
114 +
115 + // Kicks off the IO loops.
77 private void startCommunications() { 116 private void startCommunications() {
78 for (int i = 0; i < WORKERS; i++) { 117 for (int i = 0; i < WORKERS; i++) {
79 try { 118 try {
...@@ -96,20 +135,26 @@ public class DistributedClusterStore ...@@ -96,20 +135,26 @@ public class DistributedClusterStore
96 } 135 }
97 } 136 }
98 137
99 - // Establishes the controller's own identity. 138 + /**
100 - private void establishIdentity() { 139 + * Initiates open connection request and registers the pending socket
101 - // For now rely on env. variable. 140 + * channel with the given IO loop.
102 - IpPrefix ip = valueOf(System.getenv("ONOS_NIC")); 141 + *
103 - self = new DefaultControllerNode(new NodeId(ip.toString()), ip); 142 + * @param loop loop with which the channel should be registered
143 + * @throws java.io.IOException if the socket could not be open or connected
144 + */
145 + private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
146 + SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
147 + SocketChannel ch = SocketChannel.open();
148 + nodesByChannel.put(ch, node);
149 + ch.configureBlocking(false);
150 + loop.connectStream(ch);
151 + ch.connect(sa);
104 } 152 }
105 153
106 - @Deactivate 154 +
107 - public void deactivate() { 155 + // Attempts to connect to any nodes that do not have an associated connection.
108 - listenLoop.shutdown(); 156 + private void startInitiating() {
109 - for (CommLoop loop : commLoops) { 157 + timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
110 - loop.shutdown();
111 - }
112 - log.info("Stopped");
113 } 158 }
114 159
115 @Override 160 @Override
...@@ -154,7 +199,16 @@ public class DistributedClusterStore ...@@ -154,7 +199,16 @@ public class DistributedClusterStore
154 199
155 @Override 200 @Override
156 protected void acceptConnection(ServerSocketChannel channel) throws IOException { 201 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
202 + SocketChannel sc = channel.accept();
203 + sc.configureBlocking(false);
204 +
205 + Socket so = sc.socket();
206 + so.setTcpNoDelay(SO_NO_DELAY);
207 + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
208 + so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
157 209
210 + findLeastUtilizedLoop().acceptStream(sc);
211 + log.info("Connected client");
158 } 212 }
159 } 213 }
160 214
...@@ -172,5 +226,64 @@ public class DistributedClusterStore ...@@ -172,5 +226,64 @@ public class DistributedClusterStore
172 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) { 226 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
173 227
174 } 228 }
229 +
230 + @Override
231 + public TLVMessageStream acceptStream(SocketChannel channel) {
232 + TLVMessageStream stream = super.acceptStream(channel);
233 + try {
234 + InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
235 + log.info("Accepted a new connection from {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
236 + } catch (IOException e) {
237 + log.warn("Unable to accept connection from an unknown end-point", e);
238 + }
239 + return stream;
240 + }
241 +
242 + @Override
243 + public TLVMessageStream connectStream(SocketChannel channel) {
244 + TLVMessageStream stream = super.connectStream(channel);
245 + DefaultControllerNode node = nodesByChannel.get(channel);
246 + if (node != null) {
247 + log.info("Opened connection to {}", node.id());
248 + streams.put(node.id(), stream);
249 + }
250 + return stream;
251 + }
252 + }
253 +
254 +
255 + // Sweeps through all controller nodes and attempts to open connection to
256 + // those that presently do not have one.
257 + private class ConnectionCustodian extends TimerTask {
258 + @Override
259 + public void run() {
260 + for (DefaultControllerNode node : nodes.values()) {
261 + if (node != self && !streams.containsKey(node.id())) {
262 + try {
263 + openConnection(node, findLeastUtilizedLoop());
264 + } catch (IOException e) {
265 + log.warn("Unable to connect", e);
266 + }
267 + }
268 + }
269 + }
270 + }
271 +
272 + // Finds the least utilities IO loop.
273 + private CommLoop findLeastUtilizedLoop() {
274 + CommLoop leastUtilized = null;
275 + int minCount = Integer.MAX_VALUE;
276 + for (CommLoop loop : commLoops) {
277 + int count = loop.streamCount();
278 + if (count == 0) {
279 + return loop;
280 + }
281 +
282 + if (count < minCount) {
283 + leastUtilized = loop;
284 + minCount = count;
285 + }
286 + }
287 + return leastUtilized;
175 } 288 }
176 } 289 }
......
1 +package org.onlab.onos.cli;
2 +
3 +import org.apache.karaf.shell.commands.Argument;
4 +import org.apache.karaf.shell.commands.Command;
5 +import org.onlab.onos.cluster.ClusterAdminService;
6 +import org.onlab.onos.cluster.NodeId;
7 +import org.onlab.packet.IpPrefix;
8 +
9 +/**
10 + * Lists all controller cluster nodes.
11 + */
12 +@Command(scope = "onos", name = "add-node",
13 + description = "Lists all controller cluster nodes")
14 +public class NodeAddCommand extends AbstractShellCommand {
15 +
16 + @Argument(index = 0, name = "nodeId", description = "Node ID",
17 + required = true, multiValued = false)
18 + String nodeId = null;
19 +
20 + @Argument(index = 1, name = "ip", description = "Node IP address",
21 + required = true, multiValued = false)
22 + String ip = null;
23 +
24 + @Argument(index = 2, name = "tcpPort", description = "TCP port",
25 + required = false, multiValued = false)
26 + int tcpPort = 9876;
27 +
28 + @Override
29 + protected void execute() {
30 + ClusterAdminService service = get(ClusterAdminService.class);
31 + service.addNode(new NodeId(nodeId), IpPrefix.valueOf(ip), tcpPort);
32 + }
33 +
34 +}
...@@ -5,6 +5,9 @@ ...@@ -5,6 +5,9 @@
5 <action class="org.onlab.onos.cli.NodesListCommand"/> 5 <action class="org.onlab.onos.cli.NodesListCommand"/>
6 </command> 6 </command>
7 <command> 7 <command>
8 + <action class="org.onlab.onos.cli.NodeAddCommand"/>
9 + </command>
10 + <command>
8 <action class="org.onlab.onos.cli.MastersListCommand"/> 11 <action class="org.onlab.onos.cli.MastersListCommand"/>
9 <completers> 12 <completers>
10 <ref component-id="clusterIdCompleter"/> 13 <ref component-id="clusterIdCompleter"/>
......
...@@ -11,4 +11,5 @@ remote=$ONOS_USER@${1:-$OCI} ...@@ -11,4 +11,5 @@ remote=$ONOS_USER@${1:-$OCI}
11 ssh $remote " 11 ssh $remote "
12 sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \ 12 sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \
13 $ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml 13 $ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
14 + echo \"onos.ip=\$(ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
14 " 15 "
...\ No newline at end of file ...\ No newline at end of file
......
1 # Default virtual box ONOS instances 1,2 & ONOS mininet box 1 # Default virtual box ONOS instances 1,2 & ONOS mininet box
2 . $ONOS_ROOT/tools/test/cells/.reset 2 . $ONOS_ROOT/tools/test/cells/.reset
3 3
4 +export ONOS_NIC=192.168.56.*
5 +
4 export OC1="192.168.56.101" 6 export OC1="192.168.56.101"
5 export OC2="192.168.56.102" 7 export OC2="192.168.56.102"
6 8
......
1 +# Default virtual box ONOS instances 1,2 & ONOS mininet box
2 +
3 +export ONOS_NIC=192.168.56.*
4 +
5 +export OC1="192.168.56.101"
6 +export OC2="192.168.56.102"
7 +
8 +export OCN="192.168.56.105"
9 +
10 +
...@@ -54,6 +54,15 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -54,6 +54,15 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
54 } 54 }
55 55
56 /** 56 /**
57 + * Returns the number of streams in custody of the IO loop.
58 + *
59 + * @return number of message streams using this loop
60 + */
61 + public int streamCount() {
62 + return streams.size();
63 + }
64 +
65 + /**
57 * Creates a new message stream backed by the specified socket channel. 66 * Creates a new message stream backed by the specified socket channel.
58 * 67 *
59 * @param byteChannel backing byte channel 68 * @param byteChannel backing byte channel
...@@ -182,9 +191,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -182,9 +191,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
182 * with a pending accept operation. 191 * with a pending accept operation.
183 * 192 *
184 * @param channel backing socket channel 193 * @param channel backing socket channel
194 + * @return newly accepted message stream
185 */ 195 */
186 - public void acceptStream(SocketChannel channel) { 196 + public S acceptStream(SocketChannel channel) {
187 - createAndAdmit(channel, SelectionKey.OP_READ); 197 + return createAndAdmit(channel, SelectionKey.OP_READ);
188 } 198 }
189 199
190 200
...@@ -193,9 +203,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -193,9 +203,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
193 * with a pending connect operation. 203 * with a pending connect operation.
194 * 204 *
195 * @param channel backing socket channel 205 * @param channel backing socket channel
206 + * @return newly connected message stream
196 */ 207 */
197 - public void connectStream(SocketChannel channel) { 208 + public S connectStream(SocketChannel channel) {
198 - createAndAdmit(channel, SelectionKey.OP_CONNECT); 209 + return createAndAdmit(channel, SelectionKey.OP_CONNECT);
199 } 210 }
200 211
201 /** 212 /**
...@@ -205,12 +216,14 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -205,12 +216,14 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
205 * @param channel socket channel 216 * @param channel socket channel
206 * @param op pending operations mask to be applied to the selection 217 * @param op pending operations mask to be applied to the selection
207 * key as a set of initial interestedOps 218 * key as a set of initial interestedOps
219 + * @return newly created message stream
208 */ 220 */
209 - private synchronized void createAndAdmit(SocketChannel channel, int op) { 221 + private synchronized S createAndAdmit(SocketChannel channel, int op) {
210 S stream = createStream(channel); 222 S stream = createStream(channel);
211 streams.add(stream); 223 streams.add(stream);
212 newStreamRequests.add(new NewStreamRequest(stream, channel, op)); 224 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
213 selector.wakeup(); 225 selector.wakeup();
226 + return stream;
214 } 227 }
215 228
216 /** 229 /**
......