Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
Showing
19 changed files
with
457 additions
and
51 deletions
... | @@ -28,10 +28,6 @@ | ... | @@ -28,10 +28,6 @@ |
28 | <version>${project.version}</version> | 28 | <version>${project.version}</version> |
29 | </dependency> | 29 | </dependency> |
30 | <dependency> | 30 | <dependency> |
31 | - <groupId>org.livetribe.slp</groupId> | ||
32 | - <artifactId>livetribe-slp</artifactId> | ||
33 | - </dependency> | ||
34 | - <dependency> | ||
35 | <groupId>org.apache.karaf.shell</groupId> | 31 | <groupId>org.apache.karaf.shell</groupId> |
36 | <artifactId>org.apache.karaf.shell.console</artifactId> | 32 | <artifactId>org.apache.karaf.shell.console</artifactId> |
37 | </dependency> | 33 | </dependency> | ... | ... |
1 | +package org.onlab.onos.cli; | ||
2 | + | ||
3 | +import org.apache.karaf.shell.commands.Argument; | ||
4 | +import org.apache.karaf.shell.commands.Command; | ||
5 | +import org.onlab.onos.cluster.ClusterAdminService; | ||
6 | +import org.onlab.onos.cluster.NodeId; | ||
7 | +import org.onlab.packet.IpPrefix; | ||
8 | + | ||
9 | +/** | ||
10 | + * Adds a new controller cluster node. | ||
11 | + */ | ||
12 | +@Command(scope = "onos", name = "add-node", | ||
13 | + description = "Adds a new controller cluster node") | ||
14 | +public class NodeAddCommand extends AbstractShellCommand { | ||
15 | + | ||
16 | + @Argument(index = 0, name = "nodeId", description = "Node ID", | ||
17 | + required = true, multiValued = false) | ||
18 | + String nodeId = null; | ||
19 | + | ||
20 | + @Argument(index = 1, name = "ip", description = "Node IP address", | ||
21 | + required = true, multiValued = false) | ||
22 | + String ip = null; | ||
23 | + | ||
24 | + @Argument(index = 2, name = "tcpPort", description = "Node TCP listen port", | ||
25 | + required = false, multiValued = false) | ||
26 | + int tcpPort = 9876; | ||
27 | + | ||
28 | + @Override | ||
29 | + protected void execute() { | ||
30 | + ClusterAdminService service = get(ClusterAdminService.class); | ||
31 | + service.addNode(new NodeId(nodeId), IpPrefix.valueOf(ip), tcpPort); | ||
32 | + } | ||
33 | + | ||
34 | +} |
1 | +package org.onlab.onos.cli; | ||
2 | + | ||
3 | +import org.apache.karaf.shell.commands.Argument; | ||
4 | +import org.apache.karaf.shell.commands.Command; | ||
5 | +import org.onlab.onos.cluster.ClusterAdminService; | ||
6 | +import org.onlab.onos.cluster.NodeId; | ||
7 | + | ||
8 | +/** | ||
9 | + * Removes a controller cluster node. | ||
10 | + */ | ||
11 | +@Command(scope = "onos", name = "remove-node", | ||
12 | + description = "Removes a new controller cluster node") | ||
13 | +public class NodeRemoveCommand extends AbstractShellCommand { | ||
14 | + | ||
15 | + @Argument(index = 0, name = "nodeId", description = "Node ID", | ||
16 | + required = true, multiValued = false) | ||
17 | + String nodeId = null; | ||
18 | + | ||
19 | + @Override | ||
20 | + protected void execute() { | ||
21 | + ClusterAdminService service = get(ClusterAdminService.class); | ||
22 | + service.removeNode(new NodeId(nodeId)); | ||
23 | + } | ||
24 | + | ||
25 | +} |
... | @@ -5,6 +5,12 @@ | ... | @@ -5,6 +5,12 @@ |
5 | <action class="org.onlab.onos.cli.NodesListCommand"/> | 5 | <action class="org.onlab.onos.cli.NodesListCommand"/> |
6 | </command> | 6 | </command> |
7 | <command> | 7 | <command> |
8 | + <action class="org.onlab.onos.cli.NodeAddCommand"/> | ||
9 | + </command> | ||
10 | + <command> | ||
11 | + <action class="org.onlab.onos.cli.NodeRemoveCommand"/> | ||
12 | + </command> | ||
13 | + <command> | ||
8 | <action class="org.onlab.onos.cli.MastersListCommand"/> | 14 | <action class="org.onlab.onos.cli.MastersListCommand"/> |
9 | <completers> | 15 | <completers> |
10 | <ref component-id="clusterIdCompleter"/> | 16 | <ref component-id="clusterIdCompleter"/> | ... | ... |
... | @@ -26,6 +26,23 @@ | ... | @@ -26,6 +26,23 @@ |
26 | <artifactId>onos-core-serializers</artifactId> | 26 | <artifactId>onos-core-serializers</artifactId> |
27 | <version>${project.version}</version> | 27 | <version>${project.version}</version> |
28 | </dependency> | 28 | </dependency> |
29 | + | ||
30 | + | ||
31 | + <dependency> | ||
32 | + <groupId>org.onlab.onos</groupId> | ||
33 | + <artifactId>onlab-nio</artifactId> | ||
34 | + <version>${project.version}</version> | ||
35 | + </dependency> | ||
36 | + | ||
37 | + <dependency> | ||
38 | + <groupId>com.fasterxml.jackson.core</groupId> | ||
39 | + <artifactId>jackson-databind</artifactId> | ||
40 | + </dependency> | ||
41 | + <dependency> | ||
42 | + <groupId>com.fasterxml.jackson.core</groupId> | ||
43 | + <artifactId>jackson-annotations</artifactId> | ||
44 | + </dependency> | ||
45 | + | ||
29 | <dependency> | 46 | <dependency> |
30 | <groupId>org.apache.felix</groupId> | 47 | <groupId>org.apache.felix</groupId> |
31 | <artifactId>org.apache.felix.scr.annotations</artifactId> | 48 | <artifactId>org.apache.felix.scr.annotations</artifactId> | ... | ... |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterDefinitionStore.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import com.fasterxml.jackson.core.JsonEncoding; | ||
4 | +import com.fasterxml.jackson.core.JsonFactory; | ||
5 | +import com.fasterxml.jackson.databind.JsonNode; | ||
6 | +import com.fasterxml.jackson.databind.ObjectMapper; | ||
7 | +import com.fasterxml.jackson.databind.node.ArrayNode; | ||
8 | +import com.fasterxml.jackson.databind.node.ObjectNode; | ||
9 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
10 | +import org.onlab.onos.cluster.NodeId; | ||
11 | +import org.onlab.packet.IpPrefix; | ||
12 | + | ||
13 | +import java.io.File; | ||
14 | +import java.io.IOException; | ||
15 | +import java.util.HashSet; | ||
16 | +import java.util.Iterator; | ||
17 | +import java.util.Set; | ||
18 | + | ||
19 | +/** | ||
20 | + * Allows for reading and writing cluster definition as a JSON file. | ||
21 | + */ | ||
22 | +public class ClusterDefinitionStore { | ||
23 | + | ||
24 | + private final File file; | ||
25 | + | ||
26 | + /** | ||
27 | + * Creates a reader/writer of the cluster definition file. | ||
28 | + * | ||
29 | + * @param filePath location of the definition file | ||
30 | + */ | ||
31 | + public ClusterDefinitionStore(String filePath) { | ||
32 | + file = new File(filePath); | ||
33 | + } | ||
34 | + | ||
35 | + /** | ||
36 | + * Returns set of the controller nodes, including self. | ||
37 | + * | ||
38 | + * @return set of controller nodes | ||
39 | + */ | ||
40 | + public Set<DefaultControllerNode> read() throws IOException { | ||
41 | + Set<DefaultControllerNode> nodes = new HashSet<>(); | ||
42 | + ObjectMapper mapper = new ObjectMapper(); | ||
43 | + ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file); | ||
44 | + Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements(); | ||
45 | + while (it.hasNext()) { | ||
46 | + ObjectNode nodeDef = (ObjectNode) it.next(); | ||
47 | + nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()), | ||
48 | + IpPrefix.valueOf(nodeDef.get("ip").asText()), | ||
49 | + nodeDef.get("tcpPort").asInt(9876))); | ||
50 | + } | ||
51 | + return nodes; | ||
52 | + } | ||
53 | + | ||
54 | + /** | ||
55 | + * Writes the given set of the controller nodes. | ||
56 | + * | ||
57 | + * @param nodes set of controller nodes | ||
58 | + */ | ||
59 | + public void write(Set<DefaultControllerNode> nodes) throws IOException { | ||
60 | + ObjectMapper mapper = new ObjectMapper(); | ||
61 | + ObjectNode clusterNodeDef = mapper.createObjectNode(); | ||
62 | + ArrayNode nodeDefs = mapper.createArrayNode(); | ||
63 | + clusterNodeDef.set("nodes", nodeDefs); | ||
64 | + for (DefaultControllerNode node : nodes) { | ||
65 | + ObjectNode nodeDef = mapper.createObjectNode(); | ||
66 | + nodeDef.put("id", node.id().toString()) | ||
67 | + .put("ip", node.ip().toString()) | ||
68 | + .put("tcpPort", node.tcpPort()); | ||
69 | + nodeDefs.add(nodeDef); | ||
70 | + } | ||
71 | + mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8), | ||
72 | + clusterNodeDef); | ||
73 | + } | ||
74 | + | ||
75 | +} |
1 | -package org.onlab.onos.ccc; | 1 | +package org.onlab.onos.store.cluster.impl; |
2 | 2 | ||
3 | import com.google.common.collect.ImmutableSet; | 3 | import com.google.common.collect.ImmutableSet; |
4 | import org.apache.felix.scr.annotations.Activate; | 4 | import org.apache.felix.scr.annotations.Activate; |
... | @@ -21,12 +21,18 @@ import org.slf4j.LoggerFactory; | ... | @@ -21,12 +21,18 @@ import org.slf4j.LoggerFactory; |
21 | 21 | ||
22 | import java.io.IOException; | 22 | import java.io.IOException; |
23 | import java.net.InetSocketAddress; | 23 | import java.net.InetSocketAddress; |
24 | +import java.net.Socket; | ||
25 | +import java.net.SocketAddress; | ||
24 | import java.nio.channels.ByteChannel; | 26 | import java.nio.channels.ByteChannel; |
27 | +import java.nio.channels.SelectionKey; | ||
25 | import java.nio.channels.ServerSocketChannel; | 28 | import java.nio.channels.ServerSocketChannel; |
29 | +import java.nio.channels.SocketChannel; | ||
26 | import java.util.ArrayList; | 30 | import java.util.ArrayList; |
27 | import java.util.List; | 31 | import java.util.List; |
28 | import java.util.Map; | 32 | import java.util.Map; |
29 | import java.util.Set; | 33 | import java.util.Set; |
34 | +import java.util.Timer; | ||
35 | +import java.util.TimerTask; | ||
30 | import java.util.concurrent.ConcurrentHashMap; | 36 | import java.util.concurrent.ConcurrentHashMap; |
31 | import java.util.concurrent.ExecutorService; | 37 | import java.util.concurrent.ExecutorService; |
32 | import java.util.concurrent.Executors; | 38 | import java.util.concurrent.Executors; |
... | @@ -45,35 +51,88 @@ public class DistributedClusterStore | ... | @@ -45,35 +51,88 @@ public class DistributedClusterStore |
45 | extends AbstractStore<ClusterEvent, ClusterStoreDelegate> | 51 | extends AbstractStore<ClusterEvent, ClusterStoreDelegate> |
46 | implements ClusterStore { | 52 | implements ClusterStore { |
47 | 53 | ||
54 | + private static final int HELLO_MSG = 1; | ||
55 | + private static final int ECHO_MSG = 2; | ||
56 | + | ||
48 | private final Logger log = LoggerFactory.getLogger(getClass()); | 57 | private final Logger log = LoggerFactory.getLogger(getClass()); |
49 | 58 | ||
59 | + private static final long CONNECTION_CUSTODIAN_DELAY = 1000L; | ||
60 | + private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000; | ||
61 | + | ||
62 | + private static final long START_TIMEOUT = 1000; | ||
50 | private static final long SELECT_TIMEOUT = 50; | 63 | private static final long SELECT_TIMEOUT = 50; |
51 | private static final int WORKERS = 3; | 64 | private static final int WORKERS = 3; |
52 | - private static final int COMM_BUFFER_SIZE = 16 * 1024; | 65 | + private static final int COMM_BUFFER_SIZE = 32 * 1024; |
53 | private static final int COMM_IDLE_TIME = 500; | 66 | private static final int COMM_IDLE_TIME = 500; |
54 | 67 | ||
68 | + private static final boolean SO_NO_DELAY = false; | ||
69 | + private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE; | ||
70 | + private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE; | ||
71 | + | ||
55 | private DefaultControllerNode self; | 72 | private DefaultControllerNode self; |
56 | private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); | 73 | private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); |
57 | private final Map<NodeId, State> states = new ConcurrentHashMap<>(); | 74 | private final Map<NodeId, State> states = new ConcurrentHashMap<>(); |
58 | 75 | ||
76 | + // Means to track message streams to other nodes. | ||
77 | + private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>(); | ||
78 | + private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>(); | ||
79 | + | ||
80 | + // Executor pools for listening and managing connections to other nodes. | ||
59 | private final ExecutorService listenExecutor = | 81 | private final ExecutorService listenExecutor = |
60 | - Executors.newSingleThreadExecutor(namedThreads("onos-listen")); | 82 | + Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen")); |
61 | private final ExecutorService commExecutors = | 83 | private final ExecutorService commExecutors = |
62 | - Executors.newFixedThreadPool(WORKERS, namedThreads("onos-cluster")); | 84 | + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster")); |
63 | private final ExecutorService heartbeatExecutor = | 85 | private final ExecutorService heartbeatExecutor = |
64 | - Executors.newSingleThreadExecutor(namedThreads("onos-heartbeat")); | 86 | + Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat")); |
87 | + | ||
88 | + private final Timer timer = new Timer("onos-comm-initiator"); | ||
89 | + private final TimerTask connectionCustodian = new ConnectionCustodian(); | ||
65 | 90 | ||
66 | private ListenLoop listenLoop; | 91 | private ListenLoop listenLoop; |
67 | private List<CommLoop> commLoops = new ArrayList<>(WORKERS); | 92 | private List<CommLoop> commLoops = new ArrayList<>(WORKERS); |
68 | 93 | ||
69 | @Activate | 94 | @Activate |
70 | public void activate() { | 95 | public void activate() { |
71 | - establishIdentity(); | 96 | + loadClusterDefinition(); |
72 | startCommunications(); | 97 | startCommunications(); |
73 | startListening(); | 98 | startListening(); |
99 | + startInitiating(); | ||
74 | log.info("Started"); | 100 | log.info("Started"); |
75 | } | 101 | } |
76 | 102 | ||
103 | + @Deactivate | ||
104 | + public void deactivate() { | ||
105 | + listenLoop.shutdown(); | ||
106 | + for (CommLoop loop : commLoops) { | ||
107 | + loop.shutdown(); | ||
108 | + } | ||
109 | + log.info("Stopped"); | ||
110 | + } | ||
111 | + | ||
112 | + // Loads the cluster definition file | ||
113 | + private void loadClusterDefinition() { | ||
114 | +// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json"); | ||
115 | +// try { | ||
116 | +// Set<DefaultControllerNode> storedNodes = cds.read(); | ||
117 | +// for (DefaultControllerNode node : storedNodes) { | ||
118 | +// nodes.put(node.id(), node); | ||
119 | +// } | ||
120 | +// } catch (IOException e) { | ||
121 | +// log.error("Unable to read cluster definitions", e); | ||
122 | +// } | ||
123 | + | ||
124 | + // Establishes the controller's own identity. | ||
125 | + IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1")); | ||
126 | + self = nodes.get(new NodeId(ip.toString())); | ||
127 | + | ||
128 | + // As a fall-back, let's make sure we at least know who we are. | ||
129 | + if (self == null) { | ||
130 | + self = new DefaultControllerNode(new NodeId(ip.toString()), ip); | ||
131 | + nodes.put(self.id(), self); | ||
132 | + } | ||
133 | + } | ||
134 | + | ||
135 | + // Kicks off the IO loops. | ||
77 | private void startCommunications() { | 136 | private void startCommunications() { |
78 | for (int i = 0; i < WORKERS; i++) { | 137 | for (int i = 0; i < WORKERS; i++) { |
79 | try { | 138 | try { |
... | @@ -84,6 +143,13 @@ public class DistributedClusterStore | ... | @@ -84,6 +143,13 @@ public class DistributedClusterStore |
84 | log.warn("Unable to start comm IO loop", e); | 143 | log.warn("Unable to start comm IO loop", e); |
85 | } | 144 | } |
86 | } | 145 | } |
146 | + | ||
147 | + // Wait for the IO loops to start | ||
148 | + for (CommLoop loop : commLoops) { | ||
149 | + if (!loop.awaitStart(START_TIMEOUT)) { | ||
150 | + log.warn("Comm loop did not start on-time; moving on..."); | ||
151 | + } | ||
152 | + } | ||
87 | } | 153 | } |
88 | 154 | ||
89 | // Starts listening for connections from peer cluster members. | 155 | // Starts listening for connections from peer cluster members. |
... | @@ -91,25 +157,34 @@ public class DistributedClusterStore | ... | @@ -91,25 +157,34 @@ public class DistributedClusterStore |
91 | try { | 157 | try { |
92 | listenLoop = new ListenLoop(self.ip(), self.tcpPort()); | 158 | listenLoop = new ListenLoop(self.ip(), self.tcpPort()); |
93 | listenExecutor.execute(listenLoop); | 159 | listenExecutor.execute(listenLoop); |
160 | + if (!listenLoop.awaitStart(START_TIMEOUT)) { | ||
161 | + log.warn("Listen loop did not start on-time; moving on..."); | ||
162 | + } | ||
94 | } catch (IOException e) { | 163 | } catch (IOException e) { |
95 | log.error("Unable to listen for cluster connections", e); | 164 | log.error("Unable to listen for cluster connections", e); |
96 | } | 165 | } |
97 | } | 166 | } |
98 | 167 | ||
99 | - // Establishes the controller's own identity. | 168 | + /** |
100 | - private void establishIdentity() { | 169 | + * Initiates open connection request and registers the pending socket |
101 | - // For now rely on env. variable. | 170 | + * channel with the given IO loop. |
102 | - IpPrefix ip = valueOf(System.getenv("ONOS_NIC")); | 171 | + * |
103 | - self = new DefaultControllerNode(new NodeId(ip.toString()), ip); | 172 | + * @param loop loop with which the channel should be registered |
173 | + * @throws java.io.IOException if the socket could not be open or connected | ||
174 | + */ | ||
175 | + private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException { | ||
176 | + SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort()); | ||
177 | + SocketChannel ch = SocketChannel.open(); | ||
178 | + nodesByChannel.put(ch, node); | ||
179 | + ch.configureBlocking(false); | ||
180 | + ch.connect(sa); | ||
181 | + loop.connectStream(ch); | ||
104 | } | 182 | } |
105 | 183 | ||
106 | - @Deactivate | 184 | + |
107 | - public void deactivate() { | 185 | + // Attempts to connect to any nodes that do not have an associated connection. |
108 | - listenLoop.shutdown(); | 186 | + private void startInitiating() { |
109 | - for (CommLoop loop : commLoops) { | 187 | + timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY); |
110 | - loop.shutdown(); | ||
111 | - } | ||
112 | - log.info("Stopped"); | ||
113 | } | 188 | } |
114 | 189 | ||
115 | @Override | 190 | @Override |
... | @@ -144,6 +219,7 @@ public class DistributedClusterStore | ... | @@ -144,6 +219,7 @@ public class DistributedClusterStore |
144 | @Override | 219 | @Override |
145 | public void removeNode(NodeId nodeId) { | 220 | public void removeNode(NodeId nodeId) { |
146 | nodes.remove(nodeId); | 221 | nodes.remove(nodeId); |
222 | + streams.remove(nodeId); | ||
147 | } | 223 | } |
148 | 224 | ||
149 | // Listens and accepts inbound connections from other cluster nodes. | 225 | // Listens and accepts inbound connections from other cluster nodes. |
... | @@ -154,7 +230,15 @@ public class DistributedClusterStore | ... | @@ -154,7 +230,15 @@ public class DistributedClusterStore |
154 | 230 | ||
155 | @Override | 231 | @Override |
156 | protected void acceptConnection(ServerSocketChannel channel) throws IOException { | 232 | protected void acceptConnection(ServerSocketChannel channel) throws IOException { |
233 | + SocketChannel sc = channel.accept(); | ||
234 | + sc.configureBlocking(false); | ||
235 | + | ||
236 | + Socket so = sc.socket(); | ||
237 | + so.setTcpNoDelay(SO_NO_DELAY); | ||
238 | + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); | ||
239 | + so.setSendBufferSize(SO_SEND_BUFFER_SIZE); | ||
157 | 240 | ||
241 | + findLeastUtilizedLoop().acceptStream(sc); | ||
158 | } | 242 | } |
159 | } | 243 | } |
160 | 244 | ||
... | @@ -170,7 +254,109 @@ public class DistributedClusterStore | ... | @@ -170,7 +254,109 @@ public class DistributedClusterStore |
170 | 254 | ||
171 | @Override | 255 | @Override |
172 | protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) { | 256 | protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) { |
257 | + TLVMessageStream tlvStream = (TLVMessageStream) stream; | ||
258 | + for (TLVMessage message : messages) { | ||
259 | + // TODO: add type-based dispatching here... | ||
260 | + log.info("Got message {}", message.type()); | ||
173 | 261 | ||
262 | + // FIXME: hack to get going | ||
263 | + if (message.type() == HELLO_MSG) { | ||
264 | + processHello(message, tlvStream); | ||
265 | + } | ||
266 | + } | ||
267 | + } | ||
268 | + | ||
269 | + @Override | ||
270 | + public TLVMessageStream acceptStream(SocketChannel channel) { | ||
271 | + TLVMessageStream stream = super.acceptStream(channel); | ||
272 | + try { | ||
273 | + InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress(); | ||
274 | + log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress())); | ||
275 | + stream.write(createHello(self)); | ||
276 | + | ||
277 | + } catch (IOException e) { | ||
278 | + log.warn("Unable to accept connection from an unknown end-point", e); | ||
279 | + } | ||
280 | + return stream; | ||
281 | + } | ||
282 | + | ||
283 | + @Override | ||
284 | + public TLVMessageStream connectStream(SocketChannel channel) { | ||
285 | + TLVMessageStream stream = super.connectStream(channel); | ||
286 | + DefaultControllerNode node = nodesByChannel.get(channel); | ||
287 | + if (node != null) { | ||
288 | + log.info("Opened connection to node {}", node.id()); | ||
289 | + nodesByChannel.remove(channel); | ||
290 | + } | ||
291 | + return stream; | ||
292 | + } | ||
293 | + | ||
294 | + @Override | ||
295 | + protected void connect(SelectionKey key) { | ||
296 | + super.connect(key); | ||
297 | + TLVMessageStream stream = (TLVMessageStream) key.attachment(); | ||
298 | + send(stream, createHello(self)); | ||
299 | + } | ||
300 | + } | ||
301 | + | ||
302 | + // FIXME: pure hack for now | ||
303 | + private void processHello(TLVMessage message, TLVMessageStream stream) { | ||
304 | + String data = new String(message.data()); | ||
305 | + log.info("Processing hello with data [{}]", data); | ||
306 | + String[] fields = new String(data).split(":"); | ||
307 | + DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]), | ||
308 | + IpPrefix.valueOf(fields[1]), | ||
309 | + Integer.parseInt(fields[2])); | ||
310 | + stream.setNode(node); | ||
311 | + nodes.put(node.id(), node); | ||
312 | + streams.put(node.id(), stream); | ||
313 | + } | ||
314 | + | ||
315 | + // Sends message to the specified stream. | ||
316 | + private void send(TLVMessageStream stream, TLVMessage message) { | ||
317 | + try { | ||
318 | + stream.write(message); | ||
319 | + } catch (IOException e) { | ||
320 | + log.warn("Unable to send message to {}", stream.node().id()); | ||
321 | + } | ||
322 | + } | ||
323 | + | ||
324 | + private TLVMessage createHello(DefaultControllerNode self) { | ||
325 | + return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes()); | ||
326 | + } | ||
327 | + | ||
328 | + // Sweeps through all controller nodes and attempts to open connection to | ||
329 | + // those that presently do not have one. | ||
330 | + private class ConnectionCustodian extends TimerTask { | ||
331 | + @Override | ||
332 | + public void run() { | ||
333 | + for (DefaultControllerNode node : nodes.values()) { | ||
334 | + if (node != self && !streams.containsKey(node.id())) { | ||
335 | + try { | ||
336 | + openConnection(node, findLeastUtilizedLoop()); | ||
337 | + } catch (IOException e) { | ||
338 | + log.warn("Unable to connect", e); | ||
339 | + } | ||
340 | + } | ||
341 | + } | ||
342 | + } | ||
343 | + } | ||
344 | + | ||
345 | + // Finds the least utilities IO loop. | ||
346 | + private CommLoop findLeastUtilizedLoop() { | ||
347 | + CommLoop leastUtilized = null; | ||
348 | + int minCount = Integer.MAX_VALUE; | ||
349 | + for (CommLoop loop : commLoops) { | ||
350 | + int count = loop.streamCount(); | ||
351 | + if (count == 0) { | ||
352 | + return loop; | ||
353 | + } | ||
354 | + | ||
355 | + if (count < minCount) { | ||
356 | + leastUtilized = loop; | ||
357 | + minCount = count; | ||
358 | + } | ||
174 | } | 359 | } |
360 | + return leastUtilized; | ||
175 | } | 361 | } |
176 | } | 362 | } | ... | ... |
1 | -package org.onlab.onos.ccc; | 1 | +package org.onlab.onos.store.cluster.impl; |
2 | 2 | ||
3 | import org.onlab.nio.AbstractMessage; | 3 | import org.onlab.nio.AbstractMessage; |
4 | 4 | ||
... | @@ -12,17 +12,16 @@ import static com.google.common.base.MoreObjects.toStringHelper; | ... | @@ -12,17 +12,16 @@ import static com.google.common.base.MoreObjects.toStringHelper; |
12 | public class TLVMessage extends AbstractMessage { | 12 | public class TLVMessage extends AbstractMessage { |
13 | 13 | ||
14 | private final int type; | 14 | private final int type; |
15 | - private final Object data; | 15 | + private final byte[] data; |
16 | 16 | ||
17 | /** | 17 | /** |
18 | * Creates an immutable TLV message. | 18 | * Creates an immutable TLV message. |
19 | * | 19 | * |
20 | * @param type message type | 20 | * @param type message type |
21 | - * @param length message length | 21 | + * @param data message data bytes |
22 | - * @param data message data | ||
23 | */ | 22 | */ |
24 | - public TLVMessage(int type, int length, Object data) { | 23 | + public TLVMessage(int type, byte[] data) { |
25 | - this.length = length; | 24 | + this.length = data.length + TLVMessageStream.METADATA_LENGTH; |
26 | this.type = type; | 25 | this.type = type; |
27 | this.data = data; | 26 | this.data = data; |
28 | } | 27 | } |
... | @@ -37,11 +36,11 @@ public class TLVMessage extends AbstractMessage { | ... | @@ -37,11 +36,11 @@ public class TLVMessage extends AbstractMessage { |
37 | } | 36 | } |
38 | 37 | ||
39 | /** | 38 | /** |
40 | - * Returns the data object. | 39 | + * Returns the data bytes. |
41 | * | 40 | * |
42 | * @return message data | 41 | * @return message data |
43 | */ | 42 | */ |
44 | - public Object data() { | 43 | + public byte[] data() { |
45 | return data; | 44 | return data; |
46 | } | 45 | } |
47 | 46 | ... | ... |
1 | -package org.onlab.onos.ccc; | 1 | +package org.onlab.onos.store.cluster.impl; |
2 | 2 | ||
3 | import org.onlab.nio.IOLoop; | 3 | import org.onlab.nio.IOLoop; |
4 | import org.onlab.nio.MessageStream; | 4 | import org.onlab.nio.MessageStream; |
5 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
5 | 6 | ||
6 | import java.nio.ByteBuffer; | 7 | import java.nio.ByteBuffer; |
7 | import java.nio.channels.ByteChannel; | 8 | import java.nio.channels.ByteChannel; |
... | @@ -13,8 +14,13 @@ import static com.google.common.base.Preconditions.checkState; | ... | @@ -13,8 +14,13 @@ import static com.google.common.base.Preconditions.checkState; |
13 | */ | 14 | */ |
14 | public class TLVMessageStream extends MessageStream<TLVMessage> { | 15 | public class TLVMessageStream extends MessageStream<TLVMessage> { |
15 | 16 | ||
17 | + public static final int METADATA_LENGTH = 16; // 8 + 4 + 4 | ||
18 | + | ||
19 | + private static final int LENGTH_OFFSET = 12; | ||
16 | private static final long MARKER = 0xfeedcafecafefeedL; | 20 | private static final long MARKER = 0xfeedcafecafefeedL; |
17 | 21 | ||
22 | + private DefaultControllerNode node; | ||
23 | + | ||
18 | /** | 24 | /** |
19 | * Creates a message stream associated with the specified IO loop and | 25 | * Creates a message stream associated with the specified IO loop and |
20 | * backed by the given byte channel. | 26 | * backed by the given byte channel. |
... | @@ -29,17 +35,51 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { | ... | @@ -29,17 +35,51 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { |
29 | super(loop, byteChannel, bufferSize, maxIdleMillis); | 35 | super(loop, byteChannel, bufferSize, maxIdleMillis); |
30 | } | 36 | } |
31 | 37 | ||
38 | + /** | ||
39 | + * Returns the node with which this stream is associated. | ||
40 | + * | ||
41 | + * @return controller node | ||
42 | + */ | ||
43 | + DefaultControllerNode node() { | ||
44 | + return node; | ||
45 | + } | ||
46 | + | ||
47 | + /** | ||
48 | + * Sets the node with which this stream is affiliated. | ||
49 | + * | ||
50 | + * @param node controller node | ||
51 | + */ | ||
52 | + void setNode(DefaultControllerNode node) { | ||
53 | + checkState(this.node == null, "Stream is already bound to a node"); | ||
54 | + this.node = node; | ||
55 | + } | ||
56 | + | ||
32 | @Override | 57 | @Override |
33 | protected TLVMessage read(ByteBuffer buffer) { | 58 | protected TLVMessage read(ByteBuffer buffer) { |
59 | + // Do we have enough bytes to read the header? If not, bail. | ||
60 | + if (buffer.remaining() < METADATA_LENGTH) { | ||
61 | + return null; | ||
62 | + } | ||
63 | + | ||
64 | + // Peek at the length and if we have enough to read the entire message | ||
65 | + // go ahead, otherwise bail. | ||
66 | + int length = buffer.getInt(buffer.position() + LENGTH_OFFSET); | ||
67 | + if (buffer.remaining() < length) { | ||
68 | + return null; | ||
69 | + } | ||
70 | + | ||
71 | + // At this point, we have enough data to read a complete message. | ||
34 | long marker = buffer.getLong(); | 72 | long marker = buffer.getLong(); |
35 | checkState(marker == MARKER, "Incorrect message marker"); | 73 | checkState(marker == MARKER, "Incorrect message marker"); |
36 | 74 | ||
37 | int type = buffer.getInt(); | 75 | int type = buffer.getInt(); |
38 | - int length = buffer.getInt(); | 76 | + length = buffer.getInt(); |
39 | 77 | ||
40 | // TODO: add deserialization hook here | 78 | // TODO: add deserialization hook here |
79 | + byte[] data = new byte[length - METADATA_LENGTH]; | ||
80 | + buffer.get(data); | ||
41 | 81 | ||
42 | - return new TLVMessage(type, length, null); | 82 | + return new TLVMessage(type, data); |
43 | } | 83 | } |
44 | 84 | ||
45 | @Override | 85 | @Override |
... | @@ -49,5 +89,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { | ... | @@ -49,5 +89,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { |
49 | buffer.putInt(message.length()); | 89 | buffer.putInt(message.length()); |
50 | 90 | ||
51 | // TODO: add serialization hook here | 91 | // TODO: add serialization hook here |
92 | + buffer.put(message.data()); | ||
52 | } | 93 | } |
94 | + | ||
53 | } | 95 | } | ... | ... |
... | @@ -20,7 +20,7 @@ import java.util.Set; | ... | @@ -20,7 +20,7 @@ import java.util.Set; |
20 | import static org.slf4j.LoggerFactory.getLogger; | 20 | import static org.slf4j.LoggerFactory.getLogger; |
21 | 21 | ||
22 | /** | 22 | /** |
23 | - * Manages inventory of infrastructure DEVICES using trivial in-memory | 23 | + * Manages inventory of infrastructure devices using trivial in-memory |
24 | * structures implementation. | 24 | * structures implementation. |
25 | */ | 25 | */ |
26 | @Component(immediate = true) | 26 | @Component(immediate = true) | ... | ... |
... | @@ -9,7 +9,6 @@ | ... | @@ -9,7 +9,6 @@ |
9 | <bundle>mvn:org.apache.commons/commons-lang3/3.3.2</bundle> | 9 | <bundle>mvn:org.apache.commons/commons-lang3/3.3.2</bundle> |
10 | <bundle>mvn:com.google.guava/guava/18.0</bundle> | 10 | <bundle>mvn:com.google.guava/guava/18.0</bundle> |
11 | <bundle>mvn:io.netty/netty/3.9.2.Final</bundle> | 11 | <bundle>mvn:io.netty/netty/3.9.2.Final</bundle> |
12 | - <bundle>mvn:org.livetribe.slp/livetribe-slp-osgi/2.2.1</bundle> | ||
13 | 12 | ||
14 | <bundle>mvn:com.hazelcast/hazelcast/3.3</bundle> | 13 | <bundle>mvn:com.hazelcast/hazelcast/3.3</bundle> |
15 | <bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle> | 14 | <bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle> |
... | @@ -49,14 +48,11 @@ | ... | @@ -49,14 +48,11 @@ |
49 | description="ONOS core components"> | 48 | description="ONOS core components"> |
50 | <feature>onos-api</feature> | 49 | <feature>onos-api</feature> |
51 | <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> | 50 | <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> |
52 | - <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle> | 51 | + <bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle> |
53 | - <bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle> | ||
54 | - <bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle> | ||
55 | - <bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle> | ||
56 | </feature> | 52 | </feature> |
57 | 53 | ||
58 | - <feature name="onos-core-dist" version="1.0.0" | 54 | + <feature name="onos-core-hazelcast" version="1.0.0" |
59 | - description="ONOS core components"> | 55 | + description="ONOS core components built on hazelcast"> |
60 | <feature>onos-api</feature> | 56 | <feature>onos-api</feature> |
61 | <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> | 57 | <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> |
62 | <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle> | 58 | <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle> | ... | ... |
... | @@ -15,7 +15,7 @@ env JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 | ... | @@ -15,7 +15,7 @@ env JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 |
15 | 15 | ||
16 | pre-stop script | 16 | pre-stop script |
17 | /opt/onos/bin/onos halt 2>/opt/onos/var/stderr.log | 17 | /opt/onos/bin/onos halt 2>/opt/onos/var/stderr.log |
18 | - sleep 3 | 18 | + sleep 2 |
19 | end script | 19 | end script |
20 | 20 | ||
21 | script | 21 | script | ... | ... |
... | @@ -8,7 +8,21 @@ | ... | @@ -8,7 +8,21 @@ |
8 | 8 | ||
9 | remote=$ONOS_USER@${1:-$OCI} | 9 | remote=$ONOS_USER@${1:-$OCI} |
10 | 10 | ||
11 | +# Generate a cluster.json from the ON* environment variables | ||
12 | +CDEF_FILE=/tmp/cluster.json | ||
13 | +echo "{ \"nodes\":[" > $CDEF_FILE | ||
14 | +for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do | ||
15 | + echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE | ||
16 | +done | ||
17 | +echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $CDEF_FILE | ||
18 | +echo "]}" >> $CDEF_FILE | ||
19 | + | ||
11 | ssh $remote " | 20 | ssh $remote " |
12 | sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \ | 21 | sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \ |
13 | $ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml | 22 | $ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml |
14 | -" | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
23 | + | ||
24 | + echo \"onos.ip = \$(ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \ | ||
25 | + >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties | ||
26 | +" | ||
27 | + | ||
28 | +scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/ | ||
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
... | @@ -24,6 +24,7 @@ ssh $remote " | ... | @@ -24,6 +24,7 @@ ssh $remote " |
24 | # Make a link to the log file directory and make a home for auxiliaries | 24 | # Make a link to the log file directory and make a home for auxiliaries |
25 | ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log | 25 | ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log |
26 | mkdir $ONOS_INSTALL_DIR/var | 26 | mkdir $ONOS_INSTALL_DIR/var |
27 | + mkdir $ONOS_INSTALL_DIR/config | ||
27 | 28 | ||
28 | # Install the upstart configuration file and setup options for debugging | 29 | # Install the upstart configuration file and setup options for debugging |
29 | sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf | 30 | sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf | ... | ... |
1 | # Default virtual box ONOS instances 1,2 & ONOS mininet box | 1 | # Default virtual box ONOS instances 1,2 & ONOS mininet box |
2 | . $ONOS_ROOT/tools/test/cells/.reset | 2 | . $ONOS_ROOT/tools/test/cells/.reset |
3 | 3 | ||
4 | +export ONOS_NIC=192.168.56.* | ||
5 | + | ||
4 | export OC1="192.168.56.101" | 6 | export OC1="192.168.56.101" |
5 | export OC2="192.168.56.102" | 7 | export OC2="192.168.56.102" |
6 | 8 | ... | ... |
... | @@ -54,6 +54,15 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> | ... | @@ -54,6 +54,15 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
54 | } | 54 | } |
55 | 55 | ||
56 | /** | 56 | /** |
57 | + * Returns the number of message stream in custody of the loop. | ||
58 | + * | ||
59 | + * @return number of message streams | ||
60 | + */ | ||
61 | + public int streamCount() { | ||
62 | + return streams.size(); | ||
63 | + } | ||
64 | + | ||
65 | + /** | ||
57 | * Creates a new message stream backed by the specified socket channel. | 66 | * Creates a new message stream backed by the specified socket channel. |
58 | * | 67 | * |
59 | * @param byteChannel backing byte channel | 68 | * @param byteChannel backing byte channel |
... | @@ -182,9 +191,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> | ... | @@ -182,9 +191,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
182 | * with a pending accept operation. | 191 | * with a pending accept operation. |
183 | * | 192 | * |
184 | * @param channel backing socket channel | 193 | * @param channel backing socket channel |
194 | + * @return newly accepted message stream | ||
185 | */ | 195 | */ |
186 | - public void acceptStream(SocketChannel channel) { | 196 | + public S acceptStream(SocketChannel channel) { |
187 | - createAndAdmit(channel, SelectionKey.OP_READ); | 197 | + return createAndAdmit(channel, SelectionKey.OP_READ); |
188 | } | 198 | } |
189 | 199 | ||
190 | 200 | ||
... | @@ -193,9 +203,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> | ... | @@ -193,9 +203,10 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
193 | * with a pending connect operation. | 203 | * with a pending connect operation. |
194 | * | 204 | * |
195 | * @param channel backing socket channel | 205 | * @param channel backing socket channel |
206 | + * @return newly connected message stream | ||
196 | */ | 207 | */ |
197 | - public void connectStream(SocketChannel channel) { | 208 | + public S connectStream(SocketChannel channel) { |
198 | - createAndAdmit(channel, SelectionKey.OP_CONNECT); | 209 | + return createAndAdmit(channel, SelectionKey.OP_CONNECT); |
199 | } | 210 | } |
200 | 211 | ||
201 | /** | 212 | /** |
... | @@ -205,12 +216,14 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> | ... | @@ -205,12 +216,14 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
205 | * @param channel socket channel | 216 | * @param channel socket channel |
206 | * @param op pending operations mask to be applied to the selection | 217 | * @param op pending operations mask to be applied to the selection |
207 | * key as a set of initial interestedOps | 218 | * key as a set of initial interestedOps |
219 | + * @return newly created message stream | ||
208 | */ | 220 | */ |
209 | - private synchronized void createAndAdmit(SocketChannel channel, int op) { | 221 | + private synchronized S createAndAdmit(SocketChannel channel, int op) { |
210 | S stream = createStream(channel); | 222 | S stream = createStream(channel); |
211 | streams.add(stream); | 223 | streams.add(stream); |
212 | newStreamRequests.add(new NewStreamRequest(stream, channel, op)); | 224 | newStreamRequests.add(new NewStreamRequest(stream, channel, op)); |
213 | selector.wakeup(); | 225 | selector.wakeup(); |
226 | + return stream; | ||
214 | } | 227 | } |
215 | 228 | ||
216 | /** | 229 | /** | ... | ... |
... | @@ -170,7 +170,7 @@ public abstract class MessageStream<M extends Message> { | ... | @@ -170,7 +170,7 @@ public abstract class MessageStream<M extends Message> { |
170 | } | 170 | } |
171 | 171 | ||
172 | /** | 172 | /** |
173 | - * Reads, withouth blocking, a list of messages from the stream. | 173 | + * Reads, without blocking, a list of messages from the stream. |
174 | * The list will be empty if there were not messages pending. | 174 | * The list will be empty if there were not messages pending. |
175 | * | 175 | * |
176 | * @return list of messages or null if backing channel has been closed | 176 | * @return list of messages or null if backing channel has been closed | ... | ... |
-
Please register or login to post a comment