Victor Silva
Committed by Yuta HIGUCHI

MastershipLoadBalancer: listen to RegionEvents

Simplify logic around scheduling a balancing task and listen
to RegionEvents. Region Membership updates and Region Updates
should also trigger a rebalance.

Now, it is possible to queue up another balancing task even
if there is one running. They'll still never run in parallel
and will still run with at least a few seconds in between (30),
but this way we don't ever risk missing an event that makes it
necessary to rebalance - not even if we were rebalancing exactly
when that event fired.

Change-Id: I64e1c6fc5e87f2b1fffbefb54c96303dac55d1d1
...@@ -16,10 +16,6 @@ ...@@ -16,10 +16,6 @@
16 16
17 package org.onosproject.mlb; 17 package org.onosproject.mlb;
18 18
19 -import com.google.common.util.concurrent.ListenableScheduledFuture;
20 -import com.google.common.util.concurrent.ListeningScheduledExecutorService;
21 -import com.google.common.util.concurrent.MoreExecutors;
22 -
23 import org.apache.felix.scr.annotations.Activate; 19 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component; 20 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate; 21 import org.apache.felix.scr.annotations.Deactivate;
...@@ -39,15 +35,19 @@ import org.onosproject.mastership.MastershipEvent; ...@@ -39,15 +35,19 @@ import org.onosproject.mastership.MastershipEvent;
39 import org.onosproject.mastership.MastershipListener; 35 import org.onosproject.mastership.MastershipListener;
40 import org.onosproject.mastership.MastershipService; 36 import org.onosproject.mastership.MastershipService;
41 import org.osgi.service.component.ComponentContext; 37 import org.osgi.service.component.ComponentContext;
38 +import org.onosproject.net.region.RegionEvent;
39 +import org.onosproject.net.region.RegionListener;
40 +import org.onosproject.net.region.RegionService;
42 import org.slf4j.Logger; 41 import org.slf4j.Logger;
43 42
44 import java.util.Dictionary; 43 import java.util.Dictionary;
44 +import java.util.concurrent.Executors;
45 import java.util.concurrent.Future; 45 import java.util.concurrent.Future;
46 +import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicBoolean; 48 import java.util.concurrent.atomic.AtomicBoolean;
48 import java.util.concurrent.atomic.AtomicReference; 49 import java.util.concurrent.atomic.AtomicReference;
49 50
50 -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
51 import static org.onlab.util.Tools.groupedThreads; 51 import static org.onlab.util.Tools.groupedThreads;
52 import static org.slf4j.LoggerFactory.getLogger; 52 import static org.slf4j.LoggerFactory.getLogger;
53 53
...@@ -62,7 +62,7 @@ public class MastershipLoadBalancer { ...@@ -62,7 +62,7 @@ public class MastershipLoadBalancer {
62 62
63 private final Logger log = getLogger(getClass()); 63 private final Logger log = getLogger(getClass());
64 64
65 - private static final int DEFAULT_SCHEDULE_PERIOD = 5; 65 + private static final int DEFAULT_SCHEDULE_PERIOD = 30;
66 @Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD, 66 @Property(name = "schedulePeriod", intValue = DEFAULT_SCHEDULE_PERIOD,
67 label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.") 67 label = "Period to schedule balancing the mastership to be shared as evenly as by all online instances.")
68 private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD; 68 private int schedulePeriod = DEFAULT_SCHEDULE_PERIOD;
...@@ -85,6 +85,9 @@ public class MastershipLoadBalancer { ...@@ -85,6 +85,9 @@ public class MastershipLoadBalancer {
85 protected LeadershipService leadershipService; 85 protected LeadershipService leadershipService;
86 86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 + protected RegionService regionService;
89 +
90 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected ClusterService clusterService; 91 protected ClusterService clusterService;
89 92
90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -92,15 +95,24 @@ public class MastershipLoadBalancer { ...@@ -92,15 +95,24 @@ public class MastershipLoadBalancer {
92 95
93 private InnerLeadershipListener leadershipListener = new InnerLeadershipListener(); 96 private InnerLeadershipListener leadershipListener = new InnerLeadershipListener();
94 97
95 - /* This listener is used to trigger balancing for any mastership event which will include switches changing state 98 + /* This listener is used to trigger balancing for any mastership event
96 - between active and inactive states as well as the same variety of event occurring with ONOS nodes. Must 99 + * which will include switches changing state between active and inactive
97 - use a listenable executor to ensure events are triggered with no frequency greater than once every 30 seconds. 100 + * states as well as the same variety of event occurring with ONOS nodes.
98 */ 101 */
99 private InnerMastershipListener mastershipListener = new InnerMastershipListener(); 102 private InnerMastershipListener mastershipListener = new InnerMastershipListener();
100 103
101 - //Ensures that all executions do not interfere with one another (single thread) 104 + /* Used to trigger balancing on region events where there was either a
102 - private ListeningScheduledExecutorService executorService = MoreExecutors. 105 + * change on the master sets of a given region or a change on the devices
103 - listeningDecorator(newSingleThreadScheduledExecutor(groupedThreads("MastershipLoadBalancer", "%d", log))); 106 + * that belong to a region.
107 + */
108 + private InnerRegionListener regionEventListener = new InnerRegionListener();
109 +
110 + /* Ensures that all executions do not interfere with one another (single
111 + * thread) and that they are apart from each other by at least what is
112 + * defined as the schedulePeriod.
113 + */
114 + private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
115 + groupedThreads("MastershipLoadBalancer", "%d", log));
104 116
105 @Activate 117 @Activate
106 public void activate(ComponentContext context) { 118 public void activate(ComponentContext context) {
...@@ -110,6 +122,7 @@ public class MastershipLoadBalancer { ...@@ -110,6 +122,7 @@ public class MastershipLoadBalancer {
110 localId = clusterService.getLocalNode().id(); 122 localId = clusterService.getLocalNode().id();
111 leadershipService.addListener(leadershipListener); 123 leadershipService.addListener(leadershipListener);
112 leadershipService.runForLeadership(REBALANCE_MASTERSHIP); 124 leadershipService.runForLeadership(REBALANCE_MASTERSHIP);
125 + regionService.addListener(regionEventListener);
113 log.info("Started"); 126 log.info("Started");
114 } 127 }
115 128
...@@ -119,6 +132,7 @@ public class MastershipLoadBalancer { ...@@ -119,6 +132,7 @@ public class MastershipLoadBalancer {
119 mastershipService.removeListener(mastershipListener); 132 mastershipService.removeListener(mastershipListener);
120 leadershipService.withdraw(REBALANCE_MASTERSHIP); 133 leadershipService.withdraw(REBALANCE_MASTERSHIP);
121 leadershipService.removeListener(leadershipListener); 134 leadershipService.removeListener(leadershipListener);
135 + regionService.removeListener(regionEventListener);
122 cancelBalance(); 136 cancelBalance();
123 executorService.shutdown(); 137 executorService.shutdown();
124 log.info("Stopped"); 138 log.info("Stopped");
...@@ -143,23 +157,34 @@ public class MastershipLoadBalancer { ...@@ -143,23 +157,34 @@ public class MastershipLoadBalancer {
143 } 157 }
144 } 158 }
145 159
160 + // Sets flag at execution to indicate there is currently a scheduled
161 + // rebalancing. As soon as it starts running, the flag is set back to
162 + // null and another rebalancing can be queued.
146 private void scheduleBalance() { 163 private void scheduleBalance() {
147 if (isLeader.get() && nextTask.get() == null) { 164 if (isLeader.get() && nextTask.get() == null) {
148 165
149 - ListenableScheduledFuture task = 166 + Future task = executorService.schedule(new BalanceTask(),
150 - executorService.schedule(mastershipAdminService::balanceRoles, 167 + schedulePeriod, TimeUnit.SECONDS);
151 - schedulePeriod, TimeUnit.SECONDS); 168 +
152 - task.addListener(() -> {
153 - log.info("Completed balance roles");
154 - nextTask.set(null);
155 - }, MoreExecutors.directExecutor()
156 - );
157 if (!nextTask.compareAndSet(null, task)) { 169 if (!nextTask.compareAndSet(null, task)) {
158 task.cancel(false); 170 task.cancel(false);
159 } 171 }
160 } 172 }
161 } 173 }
162 174
175 + private class BalanceTask implements Runnable {
176 +
177 + @Override
178 + public void run() {
179 + // nextTask is now running, free the spot so that it is possible
180 + // to queue up another upcoming task.
181 + nextTask.set(null);
182 +
183 + mastershipAdminService.balanceRoles();
184 + log.info("Completed balance roles");
185 + }
186 + }
187 +
163 private void cancelBalance() { 188 private void cancelBalance() {
164 Future task = nextTask.getAndSet(null); 189 Future task = nextTask.getAndSet(null);
165 if (task != null) { 190 if (task != null) {
...@@ -191,7 +216,6 @@ public class MastershipLoadBalancer { ...@@ -191,7 +216,6 @@ public class MastershipLoadBalancer {
191 216
192 @Override 217 @Override
193 public void event(MastershipEvent event) { 218 public void event(MastershipEvent event) {
194 - //Sets flag at execution to indicate there is currently a scheduled rebalancing, reverts upon completion
195 scheduleBalance(); 219 scheduleBalance();
196 } 220 }
197 } 221 }
...@@ -207,4 +231,18 @@ public class MastershipLoadBalancer { ...@@ -207,4 +231,18 @@ public class MastershipLoadBalancer {
207 processLeaderChange(event.subject().leaderNodeId()); 231 processLeaderChange(event.subject().leaderNodeId());
208 } 232 }
209 } 233 }
234 +
235 + private class InnerRegionListener implements RegionListener {
236 + @Override
237 + public void event(RegionEvent event) {
238 + switch (event.type()) {
239 + case REGION_MEMBERSHIP_CHANGED:
240 + case REGION_UPDATED:
241 + scheduleBalance();
242 + break;
243 + default:
244 + break;
245 + }
246 + }
247 + }
210 } 248 }
......