Madan Jampani

Fixes for ONOS-4192

 - Do not include known peers in heartbeat message and instead each node relies on cluster metadata to add/remove nodes from inventory
 - Ignore heartbeats from nodes that are removed

Change-Id: Ia20ce84ad88aa6c723a21af1fe7f6899821181ed
...@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl; ...@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl;
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 import com.google.common.collect.ImmutableSet; 19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.collect.Maps; 20 import com.google.common.collect.Maps;
21 +
21 import org.apache.felix.scr.annotations.Activate; 22 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 23 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 24 import org.apache.felix.scr.annotations.Deactivate;
...@@ -234,7 +235,7 @@ public class DistributedClusterStore ...@@ -234,7 +235,7 @@ public class DistributedClusterStore
234 .filter(node -> !(node.id().equals(localNode.id()))) 235 .filter(node -> !(node.id().equals(localNode.id())))
235 .collect(Collectors.toSet()); 236 .collect(Collectors.toSet());
236 State state = nodeStates.get(localNode.id()); 237 State state = nodeStates.get(localNode.id());
237 - byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, state, peers)); 238 + byte[] hbMessagePayload = SERIALIZER.encode(new HeartbeatMessage(localNode, state));
238 peers.forEach((node) -> { 239 peers.forEach((node) -> {
239 heartbeatToPeer(hbMessagePayload, node); 240 heartbeatToPeer(hbMessagePayload, node);
240 State currentState = nodeStates.get(node.id()); 241 State currentState = nodeStates.get(node.id());
...@@ -277,32 +278,25 @@ public class DistributedClusterStore ...@@ -277,32 +278,25 @@ public class DistributedClusterStore
277 @Override 278 @Override
278 public void accept(Endpoint sender, byte[] message) { 279 public void accept(Endpoint sender, byte[] message) {
279 HeartbeatMessage hb = SERIALIZER.decode(message); 280 HeartbeatMessage hb = SERIALIZER.decode(message);
280 - failureDetector.report(hb.source().id()); 281 + if (clusterMetadataService.getClusterMetadata().getNodes().contains(hb.source())) {
281 - updateState(hb.source().id(), hb.state); 282 + failureDetector.report(hb.source().id());
282 - hb.knownPeers().forEach(node -> { 283 + updateState(hb.source().id(), hb.state);
283 - allNodes.put(node.id(), node); 284 + }
284 - });
285 } 285 }
286 } 286 }
287 287
288 private static class HeartbeatMessage { 288 private static class HeartbeatMessage {
289 private ControllerNode source; 289 private ControllerNode source;
290 private State state; 290 private State state;
291 - private Set<ControllerNode> knownPeers;
292 291
293 - public HeartbeatMessage(ControllerNode source, State state, Set<ControllerNode> members) { 292 + public HeartbeatMessage(ControllerNode source, State state) {
294 this.source = source; 293 this.source = source;
295 this.state = state != null ? state : State.ACTIVE; 294 this.state = state != null ? state : State.ACTIVE;
296 - this.knownPeers = ImmutableSet.copyOf(members);
297 } 295 }
298 296
299 public ControllerNode source() { 297 public ControllerNode source() {
300 return source; 298 return source;
301 } 299 }
302 -
303 - public Set<ControllerNode> knownPeers() {
304 - return knownPeers;
305 - }
306 } 300 }
307 301
308 @Override 302 @Override
...@@ -371,7 +365,6 @@ public class DistributedClusterStore ...@@ -371,7 +365,6 @@ public class DistributedClusterStore
371 365
372 /** 366 /**
373 * Restarts heartbeatSender executor. 367 * Restarts heartbeatSender executor.
374 - *
375 */ 368 */
376 private void restartHeartbeatSender() { 369 private void restartHeartbeatSender() {
377 try { 370 try {
...@@ -385,23 +378,4 @@ public class DistributedClusterStore ...@@ -385,23 +378,4 @@ public class DistributedClusterStore
385 log.warn(e.getMessage()); 378 log.warn(e.getMessage());
386 } 379 }
387 } 380 }
388 -
389 - /**
390 - * Gets current heartbeat interval.
391 - *
392 - * @return heartbeatInterval
393 - */
394 - private int getHeartbeatInterval() {
395 - return heartbeatInterval;
396 - }
397 -
398 - /**
399 - * Gets current Phi failure threshold for Accrual Failure Detector.
400 - *
401 - * @return phiFailureThreshold
402 - */
403 - private int getPhiFailureThreshold() {
404 - return phiFailureThreshold;
405 - }
406 -
407 } 381 }
...\ No newline at end of file ...\ No newline at end of file
......