tom

Added ability to properly register/deregister new connections and have the node …

…status properly reflected.
...@@ -233,7 +233,7 @@ public class IOLoopTestClient { ...@@ -233,7 +233,7 @@ public class IOLoopTestClient {
233 } 233 }
234 234
235 @Override 235 @Override
236 - protected void connect(SelectionKey key) { 236 + protected void connect(SelectionKey key) throws IOException {
237 super.connect(key); 237 super.connect(key);
238 TestMessageStream b = (TestMessageStream) key.attachment(); 238 TestMessageStream b = (TestMessageStream) key.attachment();
239 Worker w = ((CustomIOLoop) b.loop()).worker; 239 Worker w = ((CustomIOLoop) b.loop()).worker;
......
...@@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel; ...@@ -30,6 +30,7 @@ import java.nio.channels.SocketChannel;
30 import java.util.ArrayList; 30 import java.util.ArrayList;
31 import java.util.List; 31 import java.util.List;
32 import java.util.Map; 32 import java.util.Map;
33 +import java.util.Objects;
33 import java.util.Set; 34 import java.util.Set;
34 import java.util.Timer; 35 import java.util.Timer;
35 import java.util.TimerTask; 36 import java.util.TimerTask;
...@@ -129,6 +130,7 @@ public class DistributedClusterStore ...@@ -129,6 +130,7 @@ public class DistributedClusterStore
129 if (self == null) { 130 if (self == null) {
130 self = new DefaultControllerNode(new NodeId(ip.toString()), ip); 131 self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
131 nodes.put(self.id(), self); 132 nodes.put(self.id(), self);
133 + states.put(self.id(), State.ACTIVE);
132 } 134 }
133 } 135 }
134 136
...@@ -219,7 +221,10 @@ public class DistributedClusterStore ...@@ -219,7 +221,10 @@ public class DistributedClusterStore
219 @Override 221 @Override
220 public void removeNode(NodeId nodeId) { 222 public void removeNode(NodeId nodeId) {
221 nodes.remove(nodeId); 223 nodes.remove(nodeId);
222 - streams.remove(nodeId); 224 + TLVMessageStream stream = streams.remove(nodeId);
225 + if (stream != null) {
226 + stream.close();
227 + }
223 } 228 }
224 229
225 // Listens and accepts inbound connections from other cluster nodes. 230 // Listens and accepts inbound connections from other cluster nodes.
...@@ -256,12 +261,13 @@ public class DistributedClusterStore ...@@ -256,12 +261,13 @@ public class DistributedClusterStore
256 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) { 261 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
257 TLVMessageStream tlvStream = (TLVMessageStream) stream; 262 TLVMessageStream tlvStream = (TLVMessageStream) stream;
258 for (TLVMessage message : messages) { 263 for (TLVMessage message : messages) {
259 - // TODO: add type-based dispatching here... 264 + // TODO: add type-based dispatching here... this is just a hack to get going
260 - log.info("Got message {}", message.type());
261 -
262 - // FIXME: hack to get going
263 if (message.type() == HELLO_MSG) { 265 if (message.type() == HELLO_MSG) {
264 processHello(message, tlvStream); 266 processHello(message, tlvStream);
267 + } else if (message.type() == ECHO_MSG) {
268 + processEcho(message, tlvStream);
269 + } else {
270 + log.info("Deal with other messages");
265 } 271 }
266 } 272 }
267 } 273 }
...@@ -271,7 +277,7 @@ public class DistributedClusterStore ...@@ -271,7 +277,7 @@ public class DistributedClusterStore
271 TLVMessageStream stream = super.acceptStream(channel); 277 TLVMessageStream stream = super.acceptStream(channel);
272 try { 278 try {
273 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress(); 279 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
274 - log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress())); 280 + log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
275 stream.write(createHello(self)); 281 stream.write(createHello(self));
276 282
277 } catch (IOException e) { 283 } catch (IOException e) {
...@@ -285,31 +291,55 @@ public class DistributedClusterStore ...@@ -285,31 +291,55 @@ public class DistributedClusterStore
285 TLVMessageStream stream = super.connectStream(channel); 291 TLVMessageStream stream = super.connectStream(channel);
286 DefaultControllerNode node = nodesByChannel.get(channel); 292 DefaultControllerNode node = nodesByChannel.get(channel);
287 if (node != null) { 293 if (node != null) {
288 - log.info("Opened connection to node {}", node.id()); 294 + log.debug("Opened connection to node {}", node.id());
289 nodesByChannel.remove(channel); 295 nodesByChannel.remove(channel);
290 } 296 }
291 return stream; 297 return stream;
292 } 298 }
293 299
294 @Override 300 @Override
295 - protected void connect(SelectionKey key) { 301 + protected void connect(SelectionKey key) throws IOException {
302 + try {
296 super.connect(key); 303 super.connect(key);
297 TLVMessageStream stream = (TLVMessageStream) key.attachment(); 304 TLVMessageStream stream = (TLVMessageStream) key.attachment();
298 send(stream, createHello(self)); 305 send(stream, createHello(self));
306 + } catch (IOException e) {
307 + if (!Objects.equals(e.getMessage(), "Connection refused")) {
308 + throw e;
309 + }
299 } 310 }
300 } 311 }
301 312
302 - // FIXME: pure hack for now 313 + @Override
314 + protected void removeStream(MessageStream<TLVMessage> stream) {
315 + DefaultControllerNode node = ((TLVMessageStream) stream).node();
316 + if (node != null) {
317 + log.info("Closed connection to node {}", node.id());
318 + states.put(node.id(), State.INACTIVE);
319 + streams.remove(node.id());
320 + }
321 + super.removeStream(stream);
322 + }
323 + }
324 +
325 + // Processes a HELLO message from a peer controller node.
303 private void processHello(TLVMessage message, TLVMessageStream stream) { 326 private void processHello(TLVMessage message, TLVMessageStream stream) {
327 + // FIXME: pure hack for now
304 String data = new String(message.data()); 328 String data = new String(message.data());
305 - log.info("Processing hello with data [{}]", data); 329 + String[] fields = data.split(":");
306 - String[] fields = new String(data).split(":");
307 DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]), 330 DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
308 - IpPrefix.valueOf(fields[1]), 331 + valueOf(fields[1]),
309 Integer.parseInt(fields[2])); 332 Integer.parseInt(fields[2]));
310 stream.setNode(node); 333 stream.setNode(node);
311 nodes.put(node.id(), node); 334 nodes.put(node.id(), node);
312 streams.put(node.id(), stream); 335 streams.put(node.id(), stream);
336 + states.put(node.id(), State.ACTIVE);
337 + }
338 +
339 + // Processes an ECHO message from a peer controller node.
340 + private void processEcho(TLVMessage message, TLVMessageStream tlvStream) {
341 + // TODO: implement heart-beat refresh
342 + log.info("Dealing with echoes...");
313 } 343 }
314 344
315 // Sends message to the specified stream. 345 // Sends message to the specified stream.
...@@ -321,6 +351,7 @@ public class DistributedClusterStore ...@@ -321,6 +351,7 @@ public class DistributedClusterStore
321 } 351 }
322 } 352 }
323 353
354 + // Creates a hello message to be sent to a peer controller node.
324 private TLVMessage createHello(DefaultControllerNode self) { 355 private TLVMessage createHello(DefaultControllerNode self) {
325 return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes()); 356 return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
326 } 357 }
...@@ -335,7 +366,7 @@ public class DistributedClusterStore ...@@ -335,7 +366,7 @@ public class DistributedClusterStore
335 try { 366 try {
336 openConnection(node, findLeastUtilizedLoop()); 367 openConnection(node, findLeastUtilizedLoop());
337 } catch (IOException e) { 368 } catch (IOException e) {
338 - log.warn("Unable to connect", e); 369 + log.debug("Unable to connect", e);
339 } 370 }
340 } 371 }
341 } 372 }
......
...@@ -93,14 +93,9 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -93,14 +93,9 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
93 * 93 *
94 * @param key selection key holding the pending connect operation. 94 * @param key selection key holding the pending connect operation.
95 */ 95 */
96 - protected void connect(SelectionKey key) { 96 + protected void connect(SelectionKey key) throws IOException {
97 - try {
98 SocketChannel ch = (SocketChannel) key.channel(); 97 SocketChannel ch = (SocketChannel) key.channel();
99 ch.finishConnect(); 98 ch.finishConnect();
100 - } catch (IOException | IllegalStateException e) {
101 - log.warn("Unable to complete connection", e);
102 - }
103 -
104 if (key.isValid()) { 99 if (key.isValid()) {
105 key.interestOps(SelectionKey.OP_READ); 100 key.interestOps(SelectionKey.OP_READ);
106 } 101 }
...@@ -124,7 +119,11 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -124,7 +119,11 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
124 119
125 // If there is a pending connect operation, complete it. 120 // If there is a pending connect operation, complete it.
126 if (key.isConnectable()) { 121 if (key.isConnectable()) {
122 + try {
127 connect(key); 123 connect(key);
124 + } catch (IOException | IllegalStateException e) {
125 + log.warn("Unable to complete connection", e);
126 + }
128 } 127 }
129 128
130 // If there is a read operation, slurp as much data as possible. 129 // If there is a read operation, slurp as much data as possible.
......
...@@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel; ...@@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel;
10 import java.nio.channels.SelectionKey; 10 import java.nio.channels.SelectionKey;
11 import java.util.ArrayList; 11 import java.util.ArrayList;
12 import java.util.List; 12 import java.util.List;
13 +import java.util.Objects;
13 14
14 import static com.google.common.base.Preconditions.checkArgument; 15 import static com.google.common.base.Preconditions.checkArgument;
15 import static com.google.common.base.Preconditions.checkNotNull; 16 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -262,7 +263,7 @@ public abstract class MessageStream<M extends Message> { ...@@ -262,7 +263,7 @@ public abstract class MessageStream<M extends Message> {
262 try { 263 try {
263 channel.write(outbound); 264 channel.write(outbound);
264 } catch (IOException e) { 265 } catch (IOException e) {
265 - if (!closed && !e.getMessage().equals("Broken pipe")) { 266 + if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
266 log.warn("Unable to write data", e); 267 log.warn("Unable to write data", e);
267 ioError = e; 268 ioError = e;
268 } 269 }
......
...@@ -230,7 +230,7 @@ public class IOLoopTestClient { ...@@ -230,7 +230,7 @@ public class IOLoopTestClient {
230 } 230 }
231 231
232 @Override 232 @Override
233 - protected void connect(SelectionKey key) { 233 + protected void connect(SelectionKey key) throws IOException {
234 super.connect(key); 234 super.connect(key);
235 TestMessageStream b = (TestMessageStream) key.attachment(); 235 TestMessageStream b = (TestMessageStream) key.attachment();
236 Worker w = ((CustomIOLoop) b.loop()).worker; 236 Worker w = ((CustomIOLoop) b.loop()).worker;
......