sangyun-han
Committed by Gerrit Code Review

[Goldeneye][ONOS-4038] Support configurable heartbeat on DistributedClusterStore

- Add readComponentConfiguration method for @Modified
- Apply updated Tools
- Add unit test code
- Add checkNotNull about NodeId

Change-Id: If8b7d4c00f2c72d29c0abb6407530d76bc3f6d80
...@@ -21,12 +21,15 @@ import com.google.common.collect.Maps; ...@@ -21,12 +21,15 @@ import com.google.common.collect.Maps;
21 import org.apache.felix.scr.annotations.Activate; 21 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 22 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 23 import org.apache.felix.scr.annotations.Deactivate;
24 +import org.apache.felix.scr.annotations.Modified;
25 +import org.apache.felix.scr.annotations.Property;
24 import org.apache.felix.scr.annotations.Reference; 26 import org.apache.felix.scr.annotations.Reference;
25 import org.apache.felix.scr.annotations.ReferenceCardinality; 27 import org.apache.felix.scr.annotations.ReferenceCardinality;
26 import org.apache.felix.scr.annotations.Service; 28 import org.apache.felix.scr.annotations.Service;
27 import org.joda.time.DateTime; 29 import org.joda.time.DateTime;
28 import org.onlab.packet.IpAddress; 30 import org.onlab.packet.IpAddress;
29 import org.onlab.util.KryoNamespace; 31 import org.onlab.util.KryoNamespace;
32 +import org.onlab.util.Tools;
30 import org.onosproject.cluster.ClusterEvent; 33 import org.onosproject.cluster.ClusterEvent;
31 import org.onosproject.cluster.ClusterMetadataService; 34 import org.onosproject.cluster.ClusterMetadataService;
32 import org.onosproject.cluster.ClusterStore; 35 import org.onosproject.cluster.ClusterStore;
...@@ -40,8 +43,10 @@ import org.onosproject.store.cluster.messaging.Endpoint; ...@@ -40,8 +43,10 @@ import org.onosproject.store.cluster.messaging.Endpoint;
40 import org.onosproject.store.cluster.messaging.MessagingService; 43 import org.onosproject.store.cluster.messaging.MessagingService;
41 import org.onosproject.store.serializers.KryoNamespaces; 44 import org.onosproject.store.serializers.KryoNamespaces;
42 import org.onosproject.store.serializers.KryoSerializer; 45 import org.onosproject.store.serializers.KryoSerializer;
46 +import org.osgi.service.component.ComponentContext;
43 import org.slf4j.Logger; 47 import org.slf4j.Logger;
44 48
49 +import java.util.Dictionary;
45 import java.util.Map; 50 import java.util.Map;
46 import java.util.Objects; 51 import java.util.Objects;
47 import java.util.Set; 52 import java.util.Set;
...@@ -52,6 +57,7 @@ import java.util.concurrent.TimeUnit; ...@@ -52,6 +57,7 @@ import java.util.concurrent.TimeUnit;
52 import java.util.function.BiConsumer; 57 import java.util.function.BiConsumer;
53 import java.util.stream.Collectors; 58 import java.util.stream.Collectors;
54 59
60 +import static com.google.common.base.Preconditions.checkArgument;
55 import static com.google.common.base.Preconditions.checkNotNull; 61 import static com.google.common.base.Preconditions.checkNotNull;
56 import static org.onlab.util.Tools.groupedThreads; 62 import static org.onlab.util.Tools.groupedThreads;
57 import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED; 63 import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
...@@ -73,9 +79,15 @@ public class DistributedClusterStore ...@@ -73,9 +79,15 @@ public class DistributedClusterStore
73 79
74 public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat"; 80 public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
75 81
76 - // TODO: make these configurable. 82 + private static final int DEFAULT_HEARTBEAT_INTERVAL = 100;
77 - private static final int HEARTBEAT_INTERVAL_MS = 100; 83 + @Property(name = "heartbeatInterval", intValue = DEFAULT_HEARTBEAT_INTERVAL,
78 - private static final int PHI_FAILURE_THRESHOLD = 10; 84 + label = "Interval time to send heartbeat to other controller nodes (millisecond)")
85 + private int heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
86 +
87 + private static final int DEFAULT_PHI_FAILURE_THRESHOLD = 10;
88 + @Property(name = "phiFailureThreshold", intValue = DEFAULT_PHI_FAILURE_THRESHOLD,
89 + label = "the value of Phi threshold to detect accrual failure")
90 + private int phiFailureThreshold = DEFAULT_PHI_FAILURE_THRESHOLD;
79 91
80 private static final KryoSerializer SERIALIZER = new KryoSerializer() { 92 private static final KryoSerializer SERIALIZER = new KryoSerializer() {
81 @Override 93 @Override
...@@ -119,7 +131,7 @@ public class DistributedClusterStore ...@@ -119,7 +131,7 @@ public class DistributedClusterStore
119 failureDetector = new PhiAccrualFailureDetector(); 131 failureDetector = new PhiAccrualFailureDetector();
120 132
121 heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0, 133 heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
122 - HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS); 134 + heartbeatInterval, TimeUnit.MILLISECONDS);
123 135
124 log.info("Started"); 136 log.info("Started");
125 } 137 }
...@@ -133,6 +145,12 @@ public class DistributedClusterStore ...@@ -133,6 +145,12 @@ public class DistributedClusterStore
133 log.info("Stopped"); 145 log.info("Stopped");
134 } 146 }
135 147
148 + @Modified
149 + public void modified(ComponentContext context) {
150 + readComponentConfiguration(context);
151 + restartHeartbeatSender();
152 + }
153 +
136 @Override 154 @Override
137 public void setDelegate(ClusterStoreDelegate delegate) { 155 public void setDelegate(ClusterStoreDelegate delegate) {
138 checkNotNull(delegate, "Delegate cannot be null"); 156 checkNotNull(delegate, "Delegate cannot be null");
...@@ -178,6 +196,7 @@ public class DistributedClusterStore ...@@ -178,6 +196,7 @@ public class DistributedClusterStore
178 196
179 @Override 197 @Override
180 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) { 198 public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
199 + checkNotNull(nodeId, INSTANCE_ID_NULL);
181 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort); 200 ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
182 addNode(node); 201 addNode(node);
183 return node; 202 return node;
...@@ -220,7 +239,7 @@ public class DistributedClusterStore ...@@ -220,7 +239,7 @@ public class DistributedClusterStore
220 heartbeatToPeer(hbMessagePayload, node); 239 heartbeatToPeer(hbMessagePayload, node);
221 State currentState = nodeStates.get(node.id()); 240 State currentState = nodeStates.get(node.id());
222 double phi = failureDetector.phi(node.id()); 241 double phi = failureDetector.phi(node.id());
223 - if (phi >= PHI_FAILURE_THRESHOLD) { 242 + if (phi >= phiFailureThreshold) {
224 if (currentState.isActive()) { 243 if (currentState.isActive()) {
225 updateState(node.id(), State.INACTIVE); 244 updateState(node.id(), State.INACTIVE);
226 } 245 }
...@@ -291,4 +310,98 @@ public class DistributedClusterStore ...@@ -291,4 +310,98 @@ public class DistributedClusterStore
291 return nodeStateLastUpdatedTimes.get(nodeId); 310 return nodeStateLastUpdatedTimes.get(nodeId);
292 } 311 }
293 312
313 + /**
314 + * Extracts properties from the component configuration context.
315 + *
316 + * @param context the component context
317 + */
318 + private void readComponentConfiguration(ComponentContext context) {
319 + Dictionary<?, ?> properties = context.getProperties();
320 +
321 + Integer newHeartbeatInterval = Tools.getIntegerProperty(properties,
322 + "heartbeatInterval");
323 + if (newHeartbeatInterval == null) {
324 + setHeartbeatInterval(DEFAULT_HEARTBEAT_INTERVAL);
325 + log.info("Heartbeat interval time is not configured, default value is {}",
326 + DEFAULT_HEARTBEAT_INTERVAL);
327 + } else {
328 + setHeartbeatInterval(newHeartbeatInterval);
329 + log.info("Configured. Heartbeat interval time is configured to {}",
330 + heartbeatInterval);
331 + }
332 +
333 + Integer newPhiFailureThreshold = Tools.getIntegerProperty(properties,
334 + "phiFailureThreshold");
335 + if (newPhiFailureThreshold == null) {
336 + setPhiFailureThreshold(DEFAULT_PHI_FAILURE_THRESHOLD);
337 + log.info("Phi failure threshold is not configured, default value is {}",
338 + DEFAULT_PHI_FAILURE_THRESHOLD);
339 + } else {
340 + setPhiFailureThreshold(newPhiFailureThreshold);
341 + log.info("Configured. Phi failure threshold is configured to {}",
342 + phiFailureThreshold);
343 + }
344 + }
345 +
346 + /**
347 + * Sets heartbeat interval between the termination of one execution of heartbeat
348 + * and the commencement of the next.
349 + *
350 + * @param interval term between each heartbeat
351 + */
352 + private void setHeartbeatInterval(int interval) {
353 + try {
354 + checkArgument(interval > 0, "Interval must be greater than zero");
355 + heartbeatInterval = interval;
356 + } catch (IllegalArgumentException e) {
357 + log.warn(e.getMessage());
358 + heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
359 + }
360 + }
361 +
362 + /**
363 + * Sets Phi failure threshold.
364 + * Phi is based on a paper titled: "The φ Accrual Failure Detector" by Hayashibara, et al.
365 + *
366 + * @param threshold
367 + */
368 + private void setPhiFailureThreshold(int threshold) {
369 + phiFailureThreshold = threshold;
370 + }
371 +
372 + /**
373 + * Restarts heartbeatSender executor.
374 + *
375 + */
376 + private void restartHeartbeatSender() {
377 + try {
378 + ScheduledExecutorService prevSender = heartBeatSender;
379 + heartBeatSender = Executors.newSingleThreadScheduledExecutor(
380 + groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
381 + heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
382 + heartbeatInterval, TimeUnit.MILLISECONDS);
383 + prevSender.shutdown();
384 + } catch (Exception e) {
385 + log.warn(e.getMessage());
386 + }
387 + }
388 +
389 + /**
390 + * Gets current heartbeat interval.
391 + *
392 + * @return heartbeatInterval
393 + */
394 + private int getHeartbeatInterval() {
395 + return heartbeatInterval;
396 + }
397 +
398 + /**
399 + * Gets current Phi failure threshold for Accrual Failure Detector.
400 + *
401 + * @return phiFailureThreshold
402 + */
403 + private int getPhiFailureThreshold() {
404 + return phiFailureThreshold;
405 + }
406 +
294 } 407 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -33,15 +33,20 @@ import com.google.common.collect.Maps; ...@@ -33,15 +33,20 @@ import com.google.common.collect.Maps;
33 public class PhiAccrualFailureDetector { 33 public class PhiAccrualFailureDetector {
34 private final Map<NodeId, History> states = Maps.newConcurrentMap(); 34 private final Map<NodeId, History> states = Maps.newConcurrentMap();
35 35
36 - // TODO: make these configurable. 36 + // Default value
37 - private static final int WINDOW_SIZE = 250; 37 + private static final int DEFAULT_WINDOW_SIZE = 250;
38 - private static final int MIN_SAMPLES = 25; 38 + private static final int DEFAULT_MIN_SAMPLES = 25;
39 - private static final double PHI_FACTOR = 1.0 / Math.log(10.0); 39 + private static final double DEFAULT_PHI_FACTOR = 1.0 / Math.log(10.0);
40 40
41 // If a node does not have any heartbeats, this is the phi 41 // If a node does not have any heartbeats, this is the phi
42 // value to report. Indicates the node is inactive (from the 42 // value to report. Indicates the node is inactive (from the
43 // detectors perspective. 43 // detectors perspective.
44 - private static final double BOOTSTRAP_PHI_VALUE = 100.0; 44 + private static final double DEFAULT_BOOTSTRAP_PHI_VALUE = 100.0;
45 +
46 +
47 + private int minSamples = DEFAULT_MIN_SAMPLES;
48 + private double phiFactor = DEFAULT_PHI_FACTOR;
49 + private double bootstrapPhiValue = DEFAULT_BOOTSTRAP_PHI_VALUE;
45 50
46 /** 51 /**
47 * Report a new heart beat for the specified node id. 52 * Report a new heart beat for the specified node id.
...@@ -70,6 +75,8 @@ public class PhiAccrualFailureDetector { ...@@ -70,6 +75,8 @@ public class PhiAccrualFailureDetector {
70 } 75 }
71 } 76 }
72 77
78 +
79 +
73 /** 80 /**
74 * Compute phi for the specified node id. 81 * Compute phi for the specified node id.
75 * @param nodeId node id 82 * @param nodeId node id
...@@ -78,13 +85,13 @@ public class PhiAccrualFailureDetector { ...@@ -78,13 +85,13 @@ public class PhiAccrualFailureDetector {
78 public double phi(NodeId nodeId) { 85 public double phi(NodeId nodeId) {
79 checkNotNull(nodeId, "NodeId must not be null"); 86 checkNotNull(nodeId, "NodeId must not be null");
80 if (!states.containsKey(nodeId)) { 87 if (!states.containsKey(nodeId)) {
81 - return BOOTSTRAP_PHI_VALUE; 88 + return bootstrapPhiValue;
82 } 89 }
83 History nodeState = states.get(nodeId); 90 History nodeState = states.get(nodeId);
84 synchronized (nodeState) { 91 synchronized (nodeState) {
85 long latestHeartbeat = nodeState.latestHeartbeatTime(); 92 long latestHeartbeat = nodeState.latestHeartbeatTime();
86 DescriptiveStatistics samples = nodeState.samples(); 93 DescriptiveStatistics samples = nodeState.samples();
87 - if (latestHeartbeat == -1 || samples.getN() < MIN_SAMPLES) { 94 + if (latestHeartbeat == -1 || samples.getN() < minSamples) {
88 return 0.0; 95 return 0.0;
89 } 96 }
90 return computePhi(samples, latestHeartbeat, System.currentTimeMillis()); 97 return computePhi(samples, latestHeartbeat, System.currentTimeMillis());
...@@ -95,13 +102,27 @@ public class PhiAccrualFailureDetector { ...@@ -95,13 +102,27 @@ public class PhiAccrualFailureDetector {
95 long size = samples.getN(); 102 long size = samples.getN();
96 long t = tNow - tLast; 103 long t = tNow - tLast;
97 return (size > 0) 104 return (size > 0)
98 - ? PHI_FACTOR * t / samples.getMean() 105 + ? phiFactor * t / samples.getMean()
99 - : BOOTSTRAP_PHI_VALUE; 106 + : bootstrapPhiValue;
107 + }
108 +
109 +
110 + private void setMinSamples(int samples) {
111 + minSamples = samples;
112 + }
113 +
114 + private void setPhiFactor(double factor) {
115 + phiFactor = factor;
100 } 116 }
101 117
118 + private void setBootstrapPhiValue(double phiValue) {
119 + bootstrapPhiValue = phiValue;
120 + }
121 +
122 +
102 private static class History { 123 private static class History {
103 DescriptiveStatistics samples = 124 DescriptiveStatistics samples =
104 - new DescriptiveStatistics(WINDOW_SIZE); 125 + new DescriptiveStatistics(DEFAULT_WINDOW_SIZE);
105 long lastHeartbeatTime = -1; 126 long lastHeartbeatTime = -1;
106 127
107 public DescriptiveStatistics samples() { 128 public DescriptiveStatistics samples() {
......
1 +package org.onosproject.store.cluster.impl;
2 +
3 +import org.junit.After;
4 +import org.junit.Before;
5 +
6 +import static org.junit.Assert.*;
7 +
8 +/**
9 + * Unit test for DistributedClusterStore.
10 + */
11 +public class DistributedClusterStoreTest {
12 + DistributedClusterStore distributedClusterStore;
13 +
14 + @Before
15 + public void setUp() throws Exception {
16 + distributedClusterStore = new DistributedClusterStore();
17 + distributedClusterStore.activate();
18 + }
19 +
20 + @After
21 + public void tearDown() throws Exception {
22 + distributedClusterStore.deactivate();
23 + }
24 +}
...\ No newline at end of file ...\ No newline at end of file