tom

Starting to add membership management messages.

...@@ -12,11 +12,13 @@ import org.apache.felix.scr.annotations.Service; ...@@ -12,11 +12,13 @@ import org.apache.felix.scr.annotations.Service;
12 import org.onlab.onos.cluster.DefaultControllerNode; 12 import org.onlab.onos.cluster.DefaultControllerNode;
13 import org.onlab.onos.cluster.NodeId; 13 import org.onlab.onos.cluster.NodeId;
14 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 14 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15 +import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
15 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 16 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
16 -import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
17 import org.onlab.onos.store.cluster.messaging.HelloMessage; 17 import org.onlab.onos.store.cluster.messaging.HelloMessage;
18 +import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
18 import org.onlab.onos.store.cluster.messaging.MessageSubject; 19 import org.onlab.onos.store.cluster.messaging.MessageSubject;
19 import org.onlab.onos.store.cluster.messaging.MessageSubscriber; 20 import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
21 +import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
20 import org.onlab.onos.store.cluster.messaging.SerializationService; 22 import org.onlab.onos.store.cluster.messaging.SerializationService;
21 import org.onlab.packet.IpPrefix; 23 import org.onlab.packet.IpPrefix;
22 import org.slf4j.Logger; 24 import org.slf4j.Logger;
...@@ -84,16 +86,20 @@ public class ClusterCommunicationManager ...@@ -84,16 +86,20 @@ public class ClusterCommunicationManager
84 86
85 private final Timer timer = new Timer("onos-comm-initiator"); 87 private final Timer timer = new Timer("onos-comm-initiator");
86 private final TimerTask connectionCustodian = new ConnectionCustodian(); 88 private final TimerTask connectionCustodian = new ConnectionCustodian();
87 - private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber(); 89 + private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
88 90
89 @Activate 91 @Activate
90 public void activate() { 92 public void activate() {
91 - addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber); 93 + addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
94 + addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
92 log.info("Activated but waiting for delegate"); 95 log.info("Activated but waiting for delegate");
93 } 96 }
94 97
95 @Deactivate 98 @Deactivate
96 public void deactivate() { 99 public void deactivate() {
100 + removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
101 + removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
102 +
97 connectionCustodian.cancel(); 103 connectionCustodian.cancel();
98 if (connectionListener != null) { 104 if (connectionListener != null) {
99 connectionListener.shutdown(); 105 connectionListener.shutdown();
...@@ -154,7 +160,7 @@ public class ClusterCommunicationManager ...@@ -154,7 +160,7 @@ public class ClusterCommunicationManager
154 160
155 @Override 161 @Override
156 public void removeNode(DefaultControllerNode node) { 162 public void removeNode(DefaultControllerNode node) {
157 - send(new GoodbyeMessage(node.id())); 163 + send(new LeavingMemberMessage(node.id()));
158 nodes.remove(node); 164 nodes.remove(node);
159 ClusterMessageStream stream = streams.remove(node.id()); 165 ClusterMessageStream stream = streams.remove(node.id());
160 if (stream != null) { 166 if (stream != null) {
...@@ -177,7 +183,7 @@ public class ClusterCommunicationManager ...@@ -177,7 +183,7 @@ public class ClusterCommunicationManager
177 @Override 183 @Override
178 public void clearAllNodesAndStreams() { 184 public void clearAllNodesAndStreams() {
179 nodes.clear(); 185 nodes.clear();
180 - send(new GoodbyeMessage(localNode.id())); 186 + send(new LeavingMemberMessage(localNode.id()));
181 for (ClusterMessageStream stream : streams.values()) { 187 for (ClusterMessageStream stream : streams.values()) {
182 stream.close(); 188 stream.close();
183 } 189 }
...@@ -187,7 +193,7 @@ public class ClusterCommunicationManager ...@@ -187,7 +193,7 @@ public class ClusterCommunicationManager
187 /** 193 /**
188 * Dispatches the specified message to all subscribers to its subject. 194 * Dispatches the specified message to all subscribers to its subject.
189 * 195 *
190 - * @param message message to dispatch 196 + * @param message message to dispatch
191 * @param fromNodeId node from which the message was received 197 * @param fromNodeId node from which the message was received
192 */ 198 */
193 void dispatch(ClusterMessage message, NodeId fromNodeId) { 199 void dispatch(ClusterMessage message, NodeId fromNodeId) {
...@@ -200,7 +206,7 @@ public class ClusterCommunicationManager ...@@ -200,7 +206,7 @@ public class ClusterCommunicationManager
200 } 206 }
201 207
202 /** 208 /**
203 - * Removes the stream associated with the specified node. 209 + * Adds the stream associated with the specified node.
204 * 210 *
205 * @param nodeId newly detected cluster node id 211 * @param nodeId newly detected cluster node id
206 * @param ip node IP listen address 212 * @param ip node IP listen address
...@@ -212,6 +218,7 @@ public class ClusterCommunicationManager ...@@ -212,6 +218,7 @@ public class ClusterCommunicationManager
212 DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort); 218 DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
213 stream.setNode(node); 219 stream.setNode(node);
214 streams.put(node.id(), stream); 220 streams.put(node.id(), stream);
221 + send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
215 return node; 222 return node;
216 } 223 }
217 224
...@@ -329,11 +336,19 @@ public class ClusterCommunicationManager ...@@ -329,11 +336,19 @@ public class ClusterCommunicationManager
329 } 336 }
330 } 337 }
331 338
332 - private class GoodbyeSubscriber implements MessageSubscriber { 339 + private class MembershipSubscriber implements MessageSubscriber {
333 @Override 340 @Override
334 public void receive(ClusterMessage message, NodeId fromNodeId) { 341 public void receive(ClusterMessage message, NodeId fromNodeId) {
335 - log.info("Received goodbye message from {}", fromNodeId); 342 + MessageSubject subject = message.subject();
336 - nodesDelegate.nodeRemoved(fromNodeId); 343 + ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
344 + if (message.subject() == MessageSubject.NEW_MEMBER) {
345 + log.info("Node {} arrived", cmm.nodeId());
346 + nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
347 +
348 + } else if (subject == MessageSubject.LEAVING_MEMBER) {
349 + log.info("Node {} is leaving", cmm.nodeId());
350 + nodesDelegate.nodeRemoved(cmm.nodeId());
351 + }
337 } 352 }
338 } 353 }
339 } 354 }
......
...@@ -128,10 +128,11 @@ public class DistributedClusterStore ...@@ -128,10 +128,11 @@ public class DistributedClusterStore
128 @Override 128 @Override
129 public void removeNode(NodeId nodeId) { 129 public void removeNode(NodeId nodeId) {
130 if (nodeId.equals(localNode.id())) { 130 if (nodeId.equals(localNode.id())) {
131 - // FIXME: this is still broken
132 // We are being ejected from the cluster, so remove all other nodes. 131 // We are being ejected from the cluster, so remove all other nodes.
133 communicationAdminService.clearAllNodesAndStreams(); 132 communicationAdminService.clearAllNodesAndStreams();
134 nodes.clear(); 133 nodes.clear();
134 + nodes.put(localNode.id(), localNode);
135 +
135 } else { 136 } else {
136 // Remove the other node. 137 // Remove the other node.
137 DefaultControllerNode node = nodes.remove(nodeId); 138 DefaultControllerNode node = nodes.remove(nodeId);
...@@ -152,6 +153,7 @@ public class DistributedClusterStore ...@@ -152,6 +153,7 @@ public class DistributedClusterStore
152 states.put(nodeId, State.ACTIVE); 153 states.put(nodeId, State.ACTIVE);
153 return node; 154 return node;
154 } 155 }
156 +
155 @Override 157 @Override
156 public void nodeVanished(NodeId nodeId) { 158 public void nodeVanished(NodeId nodeId) {
157 states.put(nodeId, State.INACTIVE); 159 states.put(nodeId, State.INACTIVE);
......
...@@ -23,9 +23,10 @@ import org.onlab.onos.net.PortNumber; ...@@ -23,9 +23,10 @@ import org.onlab.onos.net.PortNumber;
23 import org.onlab.onos.net.provider.ProviderId; 23 import org.onlab.onos.net.provider.ProviderId;
24 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 24 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
25 import org.onlab.onos.store.cluster.messaging.EchoMessage; 25 import org.onlab.onos.store.cluster.messaging.EchoMessage;
26 -import org.onlab.onos.store.cluster.messaging.GoodbyeMessage; 26 +import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
27 import org.onlab.onos.store.cluster.messaging.HelloMessage; 27 import org.onlab.onos.store.cluster.messaging.HelloMessage;
28 import org.onlab.onos.store.cluster.messaging.MessageSubject; 28 import org.onlab.onos.store.cluster.messaging.MessageSubject;
29 +import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
29 import org.onlab.onos.store.cluster.messaging.SerializationService; 30 import org.onlab.onos.store.cluster.messaging.SerializationService;
30 import org.onlab.onos.store.serializers.ConnectPointSerializer; 31 import org.onlab.onos.store.serializers.ConnectPointSerializer;
31 import org.onlab.onos.store.serializers.DefaultLinkSerializer; 32 import org.onlab.onos.store.serializers.DefaultLinkSerializer;
...@@ -97,7 +98,8 @@ public class MessageSerializer implements SerializationService { ...@@ -97,7 +98,8 @@ public class MessageSerializer implements SerializationService {
97 98
98 MessageSubject.class, 99 MessageSubject.class,
99 HelloMessage.class, 100 HelloMessage.class,
100 - GoodbyeMessage.class, 101 + NewMemberMessage.class,
102 + LeavingMemberMessage.class,
101 EchoMessage.class 103 EchoMessage.class
102 ) 104 )
103 .register(IpPrefix.class, new IpPrefixSerializer()) 105 .register(IpPrefix.class, new IpPrefixSerializer())
......
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import org.onlab.onos.cluster.NodeId;
4 +import org.onlab.packet.IpPrefix;
5 +
6 +/**
7 + * Base for cluster membership messages.
8 + */
9 +public abstract class ClusterMembershipMessage extends ClusterMessage {
10 +
11 + private NodeId nodeId;
12 + private IpPrefix ipAddress;
13 + private int tcpPort;
14 +
15 + // For serialization
16 + protected ClusterMembershipMessage() {
17 + super(MessageSubject.HELLO);
18 + nodeId = null;
19 + ipAddress = null;
20 + tcpPort = 0;
21 + }
22 +
23 + /**
24 + * Creates a new membership message for the specified end-point data.
25 + *
26 + * @param subject message subject
27 + * @param nodeId sending node identification
28 + * @param ipAddress sending node IP address
29 + * @param tcpPort sending node TCP port
30 + */
31 + protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
32 + IpPrefix ipAddress, int tcpPort) {
33 + super(subject);
34 + this.nodeId = nodeId;
35 + this.ipAddress = ipAddress;
36 + this.tcpPort = tcpPort;
37 + }
38 +
39 + /**
40 + * Returns the sending node identifer.
41 + *
42 + * @return node identifier
43 + */
44 + public NodeId nodeId() {
45 + return nodeId;
46 + }
47 +
48 + /**
49 + * Returns the sending node IP address.
50 + *
51 + * @return node IP address
52 + */
53 + public IpPrefix ipAddress() {
54 + return ipAddress;
55 + }
56 +
57 + /**
58 + * Returns the sending node TCP listen port.
59 + *
60 + * @return TCP listen port
61 + */
62 + public int tcpPort() {
63 + return tcpPort;
64 + }
65 +
66 +}
...@@ -6,18 +6,10 @@ import org.onlab.packet.IpPrefix; ...@@ -6,18 +6,10 @@ import org.onlab.packet.IpPrefix;
6 /** 6 /**
7 * Hello message that nodes use to greet each other. 7 * Hello message that nodes use to greet each other.
8 */ 8 */
9 -public class HelloMessage extends ClusterMessage { 9 +public class HelloMessage extends ClusterMembershipMessage {
10 -
11 - private NodeId nodeId;
12 - private IpPrefix ipAddress;
13 - private int tcpPort;
14 10
15 // For serialization 11 // For serialization
16 private HelloMessage() { 12 private HelloMessage() {
17 - super(MessageSubject.HELLO);
18 - nodeId = null;
19 - ipAddress = null;
20 - tcpPort = 0;
21 } 13 }
22 14
23 /** 15 /**
...@@ -28,37 +20,7 @@ public class HelloMessage extends ClusterMessage { ...@@ -28,37 +20,7 @@ public class HelloMessage extends ClusterMessage {
28 * @param tcpPort sending node TCP port 20 * @param tcpPort sending node TCP port
29 */ 21 */
30 public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) { 22 public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
31 - super(MessageSubject.HELLO); 23 + super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
32 - this.nodeId = nodeId;
33 - this.ipAddress = ipAddress;
34 - this.tcpPort = tcpPort;
35 - }
36 -
37 - /**
38 - * Returns the sending node identifer.
39 - *
40 - * @return node identifier
41 - */
42 - public NodeId nodeId() {
43 - return nodeId;
44 - }
45 -
46 - /**
47 - * Returns the sending node IP address.
48 - *
49 - * @return node IP address
50 - */
51 - public IpPrefix ipAddress() {
52 - return ipAddress;
53 - }
54 -
55 - /**
56 - * Returns the sending node TCP listen port.
57 - *
58 - * @return TCP listen port
59 - */
60 - public int tcpPort() {
61 - return tcpPort;
62 } 24 }
63 25
64 } 26 }
......
...@@ -3,16 +3,13 @@ package org.onlab.onos.store.cluster.messaging; ...@@ -3,16 +3,13 @@ package org.onlab.onos.store.cluster.messaging;
3 import org.onlab.onos.cluster.NodeId; 3 import org.onlab.onos.cluster.NodeId;
4 4
5 /** 5 /**
6 - * Goodbye message that nodes use to leave the cluster for good. 6 + * Announcement message that nodes use to gossip about team departures.
7 */ 7 */
8 -public class GoodbyeMessage extends ClusterMessage { 8 +public class LeavingMemberMessage extends ClusterMembershipMessage {
9 -
10 - private NodeId nodeId;
11 9
12 // For serialization 10 // For serialization
13 - private GoodbyeMessage() { 11 + private LeavingMemberMessage() {
14 - super(MessageSubject.GOODBYE); 12 + super();
15 - nodeId = null;
16 } 13 }
17 14
18 /** 15 /**
...@@ -20,18 +17,8 @@ public class GoodbyeMessage extends ClusterMessage { ...@@ -20,18 +17,8 @@ public class GoodbyeMessage extends ClusterMessage {
20 * 17 *
21 * @param nodeId sending node identification 18 * @param nodeId sending node identification
22 */ 19 */
23 - public GoodbyeMessage(NodeId nodeId) { 20 + public LeavingMemberMessage(NodeId nodeId) {
24 - super(MessageSubject.HELLO); 21 + super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
25 - this.nodeId = nodeId;
26 - }
27 -
28 - /**
29 - * Returns the sending node identifer.
30 - *
31 - * @return node identifier
32 - */
33 - public NodeId nodeId() {
34 - return nodeId;
35 } 22 }
36 23
37 } 24 }
......
...@@ -8,8 +8,11 @@ public enum MessageSubject { ...@@ -8,8 +8,11 @@ public enum MessageSubject {
8 /** Represents a first greeting message. */ 8 /** Represents a first greeting message. */
9 HELLO, 9 HELLO,
10 10
11 - /** Signifies node's intent to leave the cluster. */ 11 + /** Signifies announcement about new member. */
12 - GOODBYE, 12 + NEW_MEMBER,
13 +
14 + /** Signifies announcement about leaving member. */
15 + LEAVING_MEMBER,
13 16
14 /** Signifies a heart-beat message. */ 17 /** Signifies a heart-beat message. */
15 ECHO 18 ECHO
......
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import org.onlab.onos.cluster.NodeId;
4 +import org.onlab.packet.IpPrefix;
5 +
6 +/**
7 + * Announcement message that nodes use to gossip about new arrivals.
8 + */
9 +public class NewMemberMessage extends ClusterMembershipMessage {
10 +
11 + // For serialization
12 + private NewMemberMessage() {
13 + }
14 +
15 + /**
16 + * Creates a new gossip message for the specified end-point data.
17 + *
18 + * @param nodeId sending node identification
19 + * @param ipAddress sending node IP address
20 + * @param tcpPort sending node TCP port
21 + */
22 + public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
23 + super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
24 + }
25 +
26 +}