Ayaka Koshibe
Committed by Gerrit Code Review

Prevent link event type aliasing and proper switching between flicker = on/off.

Also scale number of threads with number of available cores.

Change-Id: I438d92ab9c3df5c478f451c353135a9a64f5183e
...@@ -49,7 +49,6 @@ import org.osgi.service.component.ComponentContext; ...@@ -49,7 +49,6 @@ import org.osgi.service.component.ComponentContext;
49 import org.slf4j.Logger; 49 import org.slf4j.Logger;
50 50
51 import java.util.Dictionary; 51 import java.util.Dictionary;
52 -import java.util.Iterator;
53 import java.util.List; 52 import java.util.List;
54 import java.util.Set; 53 import java.util.Set;
55 import java.util.concurrent.ConcurrentMap; 54 import java.util.concurrent.ConcurrentMap;
...@@ -65,7 +64,6 @@ import java.net.URISyntaxException; ...@@ -65,7 +64,6 @@ import java.net.URISyntaxException;
65 import static com.google.common.base.Strings.isNullOrEmpty; 64 import static com.google.common.base.Strings.isNullOrEmpty;
66 import static org.onlab.util.Tools.groupedThreads; 65 import static org.onlab.util.Tools.groupedThreads;
67 import static org.onlab.util.Tools.toHex; 66 import static org.onlab.util.Tools.toHex;
68 -import static org.onosproject.net.MastershipRole.MASTER;
69 import static org.onosproject.net.Link.Type.DIRECT; 67 import static org.onosproject.net.Link.Type.DIRECT;
70 import static org.slf4j.LoggerFactory.getLogger; 68 import static org.slf4j.LoggerFactory.getLogger;
71 69
...@@ -83,11 +81,12 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -83,11 +81,12 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
83 // default topology file location and name. 81 // default topology file location and name.
84 private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg"; 82 private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg";
85 // default number of workers. Eventually make this tunable 83 // default number of workers. Eventually make this tunable
86 - private static final int THREADS = 8; 84 + private static final int THREADS = (int) Math.max(1, Runtime.getRuntime().availableProcessors() * 0.8);
87 85
88 - private static final int CHECK_DURATION = 10; 86 + private static final int CHECK_DURATION = 10; // sec
89 private static final int DEFAULT_RATE = 0; // usec 87 private static final int DEFAULT_RATE = 0; // usec
90 private static final int REFRESH_RATE = 3; // sec 88 private static final int REFRESH_RATE = 3; // sec
89 + // Fake device used for non-flickering thread in deviceMap
91 private static final DeviceId DEFAULT = DeviceId.deviceId("null:ffffffffffffffff"); 90 private static final DeviceId DEFAULT = DeviceId.deviceId("null:ffffffffffffffff");
92 91
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 92 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -109,14 +108,14 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -109,14 +108,14 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
109 108
110 private final InternalLinkProvider linkProvider = new InternalLinkProvider(); 109 private final InternalLinkProvider linkProvider = new InternalLinkProvider();
111 110
112 - // True for device with Driver, false otherwise. 111 + // Mapping between device and drivers that advertise links from device
113 private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps 112 private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps
114 .newConcurrentMap(); 113 .newConcurrentMap();
115 114
116 // Link descriptions 115 // Link descriptions
117 private final List<LinkDescription> linkDescrs = Lists.newArrayList(); 116 private final List<LinkDescription> linkDescrs = Lists.newArrayList();
118 117
119 - // Thread to description map 118 + // Thread to description map for dividing links amongst threads in flicker mode
120 private final List<List<LinkDescription>> linkTasks = Lists.newArrayList(); 119 private final List<List<LinkDescription>> linkTasks = Lists.newArrayList();
121 120
122 private ScheduledExecutorService linkDriver = 121 private ScheduledExecutorService linkDriver =
...@@ -131,7 +130,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -131,7 +130,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
131 private String cfgFile = CFG_PATH; 130 private String cfgFile = CFG_PATH;
132 131
133 // flag checked to create a LinkDriver, if rate is non-zero. 132 // flag checked to create a LinkDriver, if rate is non-zero.
134 - private boolean flicker = false; 133 + private volatile boolean flicker = false;
135 134
136 public NullLinkProvider() { 135 public NullLinkProvider() {
137 super(new ProviderId("null", "org.onosproject.provider.nil")); 136 super(new ProviderId("null", "org.onosproject.provider.nil"));
...@@ -142,28 +141,6 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -142,28 +141,6 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
142 cfgService.registerProperties(getClass()); 141 cfgService.registerProperties(getClass());
143 providerService = providerRegistry.register(this); 142 providerService = providerRegistry.register(this);
144 modified(context); 143 modified(context);
145 -
146 - if (flicker) {
147 - for (int i = 0; i < linkTasks.size(); i++) {
148 - List<LinkDescription> links = linkTasks.get(i);
149 - LinkDriver driver = new LinkDriver(links);
150 - links.forEach(v -> {
151 - DeviceId d = v.src().deviceId();
152 - Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
153 - s.add(driver);
154 - driverMap.put(d, s);
155 - });
156 - try {
157 - linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
158 - } catch (Exception e) {
159 - log.warn(e.getMessage());
160 - }
161 - }
162 - } else {
163 - LinkDriver driver = new LinkDriver(linkDescrs);
164 - driverMap.put(DEFAULT, Sets.newHashSet(driver));
165 - linkDriver.schedule(driver, 3, TimeUnit.SECONDS);
166 - }
167 log.info("started"); 144 log.info("started");
168 } 145 }
169 146
...@@ -195,7 +172,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -195,7 +172,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
195 String newPath; 172 String newPath;
196 try { 173 try {
197 String s = (String) properties.get("eventRate"); 174 String s = (String) properties.get("eventRate");
198 - newRate = isNullOrEmpty(s) ? eventRate : Integer.parseInt(s.trim()); 175 + newRate = isNullOrEmpty(s) ? DEFAULT_RATE : Integer.parseInt(s.trim());
199 s = (String) properties.get("cfgFile"); 176 s = (String) properties.get("cfgFile");
200 newPath = s.trim(); 177 newPath = s.trim();
201 } catch (NumberFormatException | ClassCastException e) { 178 } catch (NumberFormatException | ClassCastException e) {
...@@ -209,17 +186,45 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -209,17 +186,45 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
209 cfgFile = newPath; 186 cfgFile = newPath;
210 } 187 }
211 readGraph(cfgFile, nodeService.getLocalNode().id()); 188 readGraph(cfgFile, nodeService.getLocalNode().id());
212 - 189 + if (newRate != eventRate) {
213 - // test mode configuration 190 + if (eventRate < 0) {
214 - if (eventRate != newRate && newRate > 0) { 191 + log.warn("Invalid rate, ignoring and using default");
192 + eventRate = DEFAULT_RATE;
193 + } else {
215 eventRate = newRate; 194 eventRate = newRate;
195 + }
196 + }
197 + if (eventRate > 0) {
198 + if (!flicker) { // previously not flickering
216 flicker = true; 199 flicker = true;
217 allocateLinks(); 200 allocateLinks();
218 - } else if (newRate == 0) { 201 + driverMap.remove(DEFAULT);
202 + for (int i = 0; i < linkTasks.size(); i++) {
203 + List<LinkDescription> links = linkTasks.get(i);
204 + LinkDriver driver = new LinkDriver(links);
205 + links.forEach(v -> {
206 + DeviceId sd = v.src().deviceId();
207 + DeviceId dd = v.src().deviceId();
208 + driverMap.computeIfAbsent(sd, k -> Sets.newConcurrentHashSet()).add(driver);
209 + driverMap.computeIfAbsent(dd, k -> Sets.newConcurrentHashSet()).add(driver);
210 + });
211 + try {
212 + linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
213 + } catch (Exception e) {
214 + log.warn(e.getMessage());
215 + }
216 + }
217 + }
218 + } else {
219 + if (flicker) {
220 + driverMap.forEach((dev, lds) -> lds.forEach(l -> l.deviceRemoved(dev)));
221 + driverMap.clear();
222 + linkTasks.clear();
223 + }
219 flicker = false; 224 flicker = false;
220 - // reconfigure driver - dumb but should work. 225 + LinkDriver driver = new LinkDriver(linkDescrs);
221 - driverMap.getOrDefault(DEFAULT, Sets.newHashSet()).forEach( 226 + driverMap.put(DEFAULT, Sets.newHashSet(driver));
222 - v -> v.setTasks(linkDescrs)); 227 + linkDriver.schedule(driver, REFRESH_RATE, TimeUnit.SECONDS);
223 } 228 }
224 229
225 log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile); 230 log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
...@@ -374,11 +379,9 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -374,11 +379,9 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
374 // TODO: wait for all devices to stop core from balking 379 // TODO: wait for all devices to stop core from balking
375 break; 380 break;
376 case DEVICE_REMOVED: 381 case DEVICE_REMOVED:
377 - if (MASTER.equals(roleService.getLocalRole(dev.id()))) {
378 for (LinkDriver d : driverMap.get(dev.id())) { 382 for (LinkDriver d : driverMap.get(dev.id())) {
379 d.deviceRemoved(dev.id()); 383 d.deviceRemoved(dev.id());
380 } 384 }
381 - }
382 providerService.linksVanished(dev.id()); 385 providerService.linksVanished(dev.id());
383 break; 386 break;
384 default: 387 default:
...@@ -393,10 +396,11 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -393,10 +396,11 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
393 */ 396 */
394 private class LinkDriver implements Runnable { 397 private class LinkDriver implements Runnable {
395 // List to actually work off of 398 // List to actually work off of
396 - List<LinkDescription> tasks = Lists.newArrayList(); 399 + List<LinkDescription> tasks = Lists.newCopyOnWriteArrayList();
397 float effLoad = 0; 400 float effLoad = 0;
398 Long counter = 0L; 401 Long counter = 0L;
399 int next = 0; 402 int next = 0;
403 + boolean up = true;
400 404
401 long startTime; 405 long startTime;
402 406
...@@ -416,15 +420,16 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -416,15 +420,16 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
416 420
417 private void flicker() { 421 private void flicker() {
418 if ((!linkDriver.isShutdown() || !tasks.isEmpty())) { 422 if ((!linkDriver.isShutdown() || !tasks.isEmpty())) {
419 - log.info("next: {}, count: {}", next, counter); 423 + log.trace("next: {}, count: {}", next, counter);
420 - if (counter <= CHECK_DURATION * 1000000 / eventRate) { 424 + if (counter <= CHECK_DURATION * 1_000_000 / eventRate) {
421 - if ((counter % 2) == 0) { 425 + if (up) {
422 - providerService.linkVanished(tasks.get(next++));
423 - } else {
424 providerService.linkDetected(tasks.get(next++)); 426 providerService.linkDetected(tasks.get(next++));
427 + } else {
428 + providerService.linkVanished(tasks.get(next++));
425 } 429 }
426 - if (next == tasks.size()) { 430 + if (next >= tasks.size()) {
427 next = 0; 431 next = 0;
432 + up = !up;
428 } 433 }
429 counter++; 434 counter++;
430 } else { 435 } else {
...@@ -442,7 +447,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -442,7 +447,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
442 447
443 private void refresh() { 448 private void refresh() {
444 if (!linkDriver.isShutdown() || !tasks.isEmpty()) { 449 if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
445 - log.info("iter {} refresh_links", counter); 450 + log.trace("iter {} refresh_links", counter);
446 451
447 for (LinkDescription desc : tasks) { 452 for (LinkDescription desc : tasks) {
448 providerService.linkDetected(desc); 453 providerService.linkDetected(desc);
...@@ -454,23 +459,20 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -454,23 +459,20 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
454 } 459 }
455 460
456 public void deviceRemoved(DeviceId did) { 461 public void deviceRemoved(DeviceId did) {
457 - synchronized (tasks) { 462 + List<LinkDescription> rm = Lists.newArrayList();
458 - Iterator<LinkDescription> it = tasks.iterator(); 463 + for (LinkDescription ld : tasks) {
459 - while (it.hasNext()) {
460 - LinkDescription ld = it.next();
461 if (did.equals(ld.dst().deviceId()) 464 if (did.equals(ld.dst().deviceId())
462 || (did.equals(ld.src().deviceId()))) { 465 || (did.equals(ld.src().deviceId()))) {
463 - it.remove(); 466 + rm.add(ld);
464 - }
465 } 467 }
466 } 468 }
469 + tasks.removeAll(rm);
467 } 470 }
468 471
469 public void setTasks(List<LinkDescription> links) { 472 public void setTasks(List<LinkDescription> links) {
470 HashMultimap<ConnectPoint, ConnectPoint> nm = HashMultimap.create(); 473 HashMultimap<ConnectPoint, ConnectPoint> nm = HashMultimap.create();
471 links.forEach(v -> nm.put(v.src(), v.dst())); 474 links.forEach(v -> nm.put(v.src(), v.dst()));
472 // remove and send linkVanished for stale links. 475 // remove and send linkVanished for stale links.
473 - synchronized (this) {
474 for (LinkDescription l : tasks) { 476 for (LinkDescription l : tasks) {
475 if (!nm.containsEntry(l.src(), l.dst())) { 477 if (!nm.containsEntry(l.src(), l.dst())) {
476 providerService.linkVanished(l); 478 providerService.linkVanished(l);
...@@ -480,6 +482,5 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -480,6 +482,5 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
480 tasks.addAll(links); 482 tasks.addAll(links);
481 } 483 }
482 } 484 }
483 - }
484 485
485 } 486 }
......