Yuta HIGUCHI

ClusterCommunicationManager bugfixes

Change-Id: I0cb433bf2197c7c745733657607d9e62bb23567d
...@@ -26,6 +26,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; ...@@ -26,6 +26,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
26 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 26 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
27 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 27 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
28 import org.onlab.onos.store.cluster.messaging.MessageSubject; 28 import org.onlab.onos.store.cluster.messaging.MessageSubject;
29 +import org.onlab.onos.store.serializers.ClusterMessageSerializer;
29 import org.onlab.onos.store.serializers.KryoPoolUtil; 30 import org.onlab.onos.store.serializers.KryoPoolUtil;
30 import org.onlab.onos.store.serializers.KryoSerializer; 31 import org.onlab.onos.store.serializers.KryoSerializer;
31 import org.onlab.util.KryoPool; 32 import org.onlab.util.KryoPool;
...@@ -50,8 +51,6 @@ public class ClusterCommunicationManager ...@@ -50,8 +51,6 @@ public class ClusterCommunicationManager
50 private ClusterService clusterService; 51 private ClusterService clusterService;
51 52
52 private ClusterNodesDelegate nodesDelegate; 53 private ClusterNodesDelegate nodesDelegate;
53 - // FIXME: `members` should go away and should be using ClusterService
54 - private Map<NodeId, ControllerNode> members = new HashMap<>();
55 private final Timer timer = new Timer("onos-controller-heatbeats"); 54 private final Timer timer = new Timer("onos-controller-heatbeats");
56 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L; 55 public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
57 56
...@@ -59,11 +58,14 @@ public class ClusterCommunicationManager ...@@ -59,11 +58,14 @@ public class ClusterCommunicationManager
59 private MessagingService messagingService; 58 private MessagingService messagingService;
60 59
61 private static final KryoSerializer SERIALIZER = new KryoSerializer() { 60 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
61 + @Override
62 protected void setupKryoPool() { 62 protected void setupKryoPool() {
63 serializerPool = KryoPool.newBuilder() 63 serializerPool = KryoPool.newBuilder()
64 .register(KryoPoolUtil.API) 64 .register(KryoPoolUtil.API)
65 - .register(ClusterMessage.class) 65 + .register(ClusterMessage.class, new ClusterMessageSerializer())
66 .register(ClusterMembershipEvent.class) 66 .register(ClusterMembershipEvent.class)
67 + .register(byte[].class)
68 + .register(MessageSubject.class)
67 .build() 69 .build()
68 .populate(1); 70 .populate(1);
69 } 71 }
...@@ -73,7 +75,15 @@ public class ClusterCommunicationManager ...@@ -73,7 +75,15 @@ public class ClusterCommunicationManager
73 @Activate 75 @Activate
74 public void activate() { 76 public void activate() {
75 localNode = clusterService.getLocalNode(); 77 localNode = clusterService.getLocalNode();
76 - messagingService = new NettyMessagingService(localNode.tcpPort()); 78 + NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
79 + // FIXME: workaround until it becomes a service.
80 + try {
81 + netty.activate();
82 + } catch (Exception e) {
83 + // TODO Auto-generated catch block
84 + log.error("NettyMessagingService#activate", e);
85 + }
86 + messagingService = netty;
77 log.info("Started"); 87 log.info("Started");
78 } 88 }
79 89
...@@ -86,7 +96,7 @@ public class ClusterCommunicationManager ...@@ -86,7 +96,7 @@ public class ClusterCommunicationManager
86 @Override 96 @Override
87 public boolean broadcast(ClusterMessage message) { 97 public boolean broadcast(ClusterMessage message) {
88 boolean ok = true; 98 boolean ok = true;
89 - for (ControllerNode node : members.values()) { 99 + for (ControllerNode node : clusterService.getNodes()) {
90 if (!node.equals(localNode)) { 100 if (!node.equals(localNode)) {
91 ok = unicast(message, node.id()) && ok; 101 ok = unicast(message, node.id()) && ok;
92 } 102 }
...@@ -107,7 +117,7 @@ public class ClusterCommunicationManager ...@@ -107,7 +117,7 @@ public class ClusterCommunicationManager
107 117
108 @Override 118 @Override
109 public boolean unicast(ClusterMessage message, NodeId toNodeId) { 119 public boolean unicast(ClusterMessage message, NodeId toNodeId) {
110 - ControllerNode node = members.get(toNodeId); 120 + ControllerNode node = clusterService.getNode(toNodeId);
111 checkArgument(node != null, "Unknown nodeId: %s", toNodeId); 121 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
112 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); 122 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
113 try { 123 try {
...@@ -137,7 +147,7 @@ public class ClusterCommunicationManager ...@@ -137,7 +147,7 @@ public class ClusterCommunicationManager
137 147
138 @Override 148 @Override
139 public void addNode(ControllerNode node) { 149 public void addNode(ControllerNode node) {
140 - members.put(node.id(), node); 150 + //members.put(node.id(), node);
141 } 151 }
142 152
143 @Override 153 @Override
...@@ -146,7 +156,7 @@ public class ClusterCommunicationManager ...@@ -146,7 +156,7 @@ public class ClusterCommunicationManager
146 localNode.id(), 156 localNode.id(),
147 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), 157 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
148 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)))); 158 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
149 - members.remove(node.id()); 159 + //members.remove(node.id());
150 } 160 }
151 161
152 // Sends a heart beat to all peers. 162 // Sends a heart beat to all peers.
......