Ayaka Koshibe

rehash with ScheduledExecutors

Change-Id: I37c377781a4478250ce5805fd22eb5c589af6bae
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
15 */ 15 */
16 package org.onosproject.provider.nil.link.impl; 16 package org.onosproject.provider.nil.link.impl;
17 17
18 +import com.google.common.collect.Lists;
18 import com.google.common.collect.Maps; 19 import com.google.common.collect.Maps;
19 import com.google.common.collect.Sets; 20 import com.google.common.collect.Sets;
20 21
21 -import org.apache.commons.lang3.concurrent.ConcurrentUtils;
22 import org.apache.felix.scr.annotations.Activate; 22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component; 23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate; 24 import org.apache.felix.scr.annotations.Deactivate;
...@@ -47,10 +47,13 @@ import org.osgi.service.component.ComponentContext; ...@@ -47,10 +47,13 @@ import org.osgi.service.component.ComponentContext;
47 import org.slf4j.Logger; 47 import org.slf4j.Logger;
48 48
49 import java.util.Dictionary; 49 import java.util.Dictionary;
50 +import java.util.Iterator;
51 +import java.util.List;
52 +import java.util.ArrayList;
50 import java.util.Set; 53 import java.util.Set;
51 import java.util.concurrent.ConcurrentMap; 54 import java.util.concurrent.ConcurrentMap;
52 -import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors; 55 import java.util.concurrent.Executors;
56 +import java.util.concurrent.ScheduledExecutorService;
54 import java.util.concurrent.TimeUnit; 57 import java.util.concurrent.TimeUnit;
55 import java.io.BufferedReader; 58 import java.io.BufferedReader;
56 import java.io.FileReader; 59 import java.io.FileReader;
...@@ -76,11 +79,14 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -76,11 +79,14 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
76 79
77 private final Logger log = getLogger(getClass()); 80 private final Logger log = getLogger(getClass());
78 81
82 + // default topology file location and name.
79 private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg"; 83 private static final String CFG_PATH = "/opt/onos/apache-karaf-3.0.2/etc/linkGraph.cfg";
84 + // default number of workers. Eventually make this tunable
85 + private static final int THREADS = 8;
80 86
81 private static final int CHECK_DURATION = 10; 87 private static final int CHECK_DURATION = 10;
82 - private static final int DEFAULT_RATE = 0; 88 + private static final int DEFAULT_RATE = 0; // usec
83 - private static final int REFRESH_RATE = 3000000; // in us 89 + private static final int REFRESH_RATE = 3; // sec
84 90
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected DeviceService deviceService; 92 protected DeviceService deviceService;
...@@ -99,15 +105,17 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -99,15 +105,17 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
99 private final InternalLinkProvider linkProvider = new InternalLinkProvider(); 105 private final InternalLinkProvider linkProvider = new InternalLinkProvider();
100 106
101 // True for device with Driver, false otherwise. 107 // True for device with Driver, false otherwise.
102 - private final ConcurrentMap<DeviceId, Boolean> driverMap = Maps 108 + private final ConcurrentMap<DeviceId, Set<LinkDriver>> driverMap = Maps
103 .newConcurrentMap(); 109 .newConcurrentMap();
104 110
105 // Link descriptions 111 // Link descriptions
106 - private final ConcurrentMap<DeviceId, Set<LinkDescription>> linkDescrs = Maps 112 + private final Set<LinkDescription> linkDescrs = Sets.newConcurrentHashSet();
107 - .newConcurrentMap(); 113 +
114 + // Thread to description map
115 + private final List<Set<LinkDescription>> linkTasks = new ArrayList<>(THREADS);
108 116
109 - private ExecutorService linkDriver = 117 + private ScheduledExecutorService linkDriver =
110 - Executors.newCachedThreadPool(groupedThreads("onos/null", "link-driver-%d")); 118 + Executors.newScheduledThreadPool(THREADS, groupedThreads("onos/null", "link-driver-%d"));
111 119
112 // For flicker = true, duration between events in msec. 120 // For flicker = true, duration between events in msec.
113 @Property(name = "eventRate", value = "0", label = "Duration between Link Event") 121 @Property(name = "eventRate", value = "0", label = "Duration between Link Event")
...@@ -130,8 +138,27 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -130,8 +138,27 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
130 public void activate(ComponentContext context) { 138 public void activate(ComponentContext context) {
131 providerService = providerRegistry.register(this); 139 providerService = providerRegistry.register(this);
132 modified(context); 140 modified(context);
133 - deviceService.addListener(linkProvider);
134 141
142 + if (flicker) {
143 + allocateLinks();
144 + for (int i = 0; i < linkTasks.size(); i++) {
145 + Set<LinkDescription> links = linkTasks.get(i);
146 + LinkDriver driver = new LinkDriver(links, i);
147 + links.forEach(v -> {
148 + DeviceId d = v.src().deviceId();
149 + Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
150 + s.add(driver);
151 + driverMap.put(d, s);
152 + });
153 + try {
154 + linkDriver.schedule(driver, eventRate, TimeUnit.MICROSECONDS);
155 + } catch (Exception e) {
156 + log.warn(e.getMessage());
157 + }
158 + }
159 + } else {
160 + linkDriver.schedule(new LinkDriver(linkDescrs, 0), 3, TimeUnit.SECONDS);
161 + }
135 log.info("started"); 162 log.info("started");
136 } 163 }
137 164
...@@ -179,24 +206,13 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -179,24 +206,13 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
179 206
180 // test mode configuration 207 // test mode configuration
181 if (eventRate != newRate && newRate > 0) { 208 if (eventRate != newRate && newRate > 0) {
182 - driverMap.replaceAll((k, v) -> false);
183 eventRate = newRate; 209 eventRate = newRate;
184 flicker = true; 210 flicker = true;
185 } else if (newRate == 0) { 211 } else if (newRate == 0) {
186 - driverMap.replaceAll((k, v) -> false);
187 flicker = false; 212 flicker = false;
188 } 213 }
189 214
190 log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile); 215 log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
191 - for (Device dev : deviceService.getDevices()) {
192 - DeviceId did = dev.id();
193 - synchronized (this) {
194 - if (driverMap.get(did) == null || !driverMap.get(did)) {
195 - driverMap.put(dev.id(), true);
196 - linkDriver.submit(new LinkDriver(dev));
197 - }
198 - }
199 - }
200 } 216 }
201 217
202 // parse simplified dot-like topology graph 218 // parse simplified dot-like topology graph
...@@ -266,7 +282,6 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -266,7 +282,6 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
266 String[] cp1 = linkArr[0].split(":"); 282 String[] cp1 = linkArr[0].split(":");
267 String[] cp2 = linkArr[2].split(":"); 283 String[] cp2 = linkArr[2].split(":");
268 284
269 - log.debug("cp1:{} cp2:{}", cp1, cp2);
270 if (cp1.length != 2 && (cp2.length != 2 || cp2.length != 3)) { 285 if (cp1.length != 2 && (cp2.length != 2 || cp2.length != 3)) {
271 log.warn("Malformed endpoint descriptor(s):" 286 log.warn("Malformed endpoint descriptor(s):"
272 + "endpoint format should be DeviceId:port or DeviceId:port:NodeId," 287 + "endpoint format should be DeviceId:port or DeviceId:port:NodeId,"
...@@ -285,18 +300,15 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -285,18 +300,15 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
285 DeviceId ddev = (adj == null) ? recover(cp2[0], me) : recover(cp2[0], adj); 300 DeviceId ddev = (adj == null) ? recover(cp2[0], me) : recover(cp2[0], adj);
286 ConnectPoint src = new ConnectPoint(sdev, PortNumber.portNumber(cp1[1])); 301 ConnectPoint src = new ConnectPoint(sdev, PortNumber.portNumber(cp1[1]));
287 ConnectPoint dst = new ConnectPoint(ddev, PortNumber.portNumber(cp2[1])); 302 ConnectPoint dst = new ConnectPoint(ddev, PortNumber.portNumber(cp2[1]));
288 - 303 + // both link types have incoming half-link
304 + LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
305 + linkDescrs.add(in);
289 if (op.equals("--")) { 306 if (op.equals("--")) {
290 - // bidirectional - within our node's island 307 + // bidirectional - within our node's island, make outbound link
291 LinkDescription out = new DefaultLinkDescription(src, dst, DIRECT); 308 LinkDescription out = new DefaultLinkDescription(src, dst, DIRECT);
292 - LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT); 309 + linkDescrs.add(out);
293 - addLdesc(sdev, out); 310 + log.info("Created bidirectional link: {}, {}", out, in);
294 - addLdesc(ddev, in);
295 - log.info("Created bidirectional link: {}", out);
296 } else if (op.equals("->")) { 311 } else if (op.equals("->")) {
297 - // unidirectional - likely from another island
298 - LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
299 - addLdesc(ddev, in);
300 log.info("Created unidirectional link: {}", in); 312 log.info("Created unidirectional link: {}", in);
301 } else { 313 } else {
302 log.warn("Unknown link descriptor operand:" 314 log.warn("Unknown link descriptor operand:"
...@@ -309,7 +321,6 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -309,7 +321,6 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
309 private DeviceId recover(String base, NodeId node) { 321 private DeviceId recover(String base, NodeId node) {
310 long hash = node.hashCode() << 16; 322 long hash = node.hashCode() << 16;
311 int dev = Integer.valueOf(base); 323 int dev = Integer.valueOf(base);
312 - log.debug("hash: {}, dev: {}, {}", hash, dev, toHex(hash | dev));
313 try { 324 try {
314 return DeviceId.deviceId(new URI("null", toHex(hash | dev), null)); 325 return DeviceId.deviceId(new URI("null", toHex(hash | dev), null));
315 } catch (URISyntaxException e) { 326 } catch (URISyntaxException e) {
...@@ -318,11 +329,19 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -318,11 +329,19 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
318 } 329 }
319 } 330 }
320 331
321 - // add LinkDescriptions to map 332 + // adds a LinkDescription to a worker's to-be queue, for flickering
322 - private boolean addLdesc(DeviceId did, LinkDescription ldesc) { 333 + private void allocateLinks() {
323 - Set<LinkDescription> ldescs = ConcurrentUtils.putIfAbsent( 334 + int index, lcount = 0;
324 - linkDescrs, did, Sets.newConcurrentHashSet()); 335 + for (LinkDescription ld : linkDescrs) {
325 - return ldescs.add(ldesc); 336 + index = (lcount % THREADS);
337 + log.info("allocation: total={}, index={}", linkDescrs.size(), lcount, index);
338 + if (linkTasks.size() <= index) {
339 + linkTasks.add(index, Sets.newHashSet(ld));
340 + } else {
341 + linkTasks.get(index).add(ld);
342 + }
343 + lcount++;
344 + }
326 } 345 }
327 346
328 /** 347 /**
...@@ -335,19 +354,14 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -335,19 +354,14 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
335 Device dev = event.subject(); 354 Device dev = event.subject();
336 switch (event.type()) { 355 switch (event.type()) {
337 case DEVICE_ADDED: 356 case DEVICE_ADDED:
338 - synchronized (this) { 357 + // TODO: wait for all devices to stop core from balking
339 - if (!driverMap.getOrDefault(dev.id(), false)) {
340 - driverMap.put(dev.id(), true);
341 - linkDriver.submit(new LinkDriver(dev));
342 - }
343 - }
344 break; 358 break;
345 case DEVICE_REMOVED: 359 case DEVICE_REMOVED:
346 - driverMap.put(dev.id(), false); 360 + if (MASTER.equals(roleService.getLocalRole(dev.id()))) {
347 - if (!MASTER.equals(roleService.getLocalRole(dev.id()))) { 361 + for (LinkDriver d : driverMap.get(dev.id())) {
348 - return; 362 + d.deviceRemoved(dev.id());
363 + }
349 } 364 }
350 - // no need to remove static links, just stop advertising them
351 providerService.linksVanished(dev.id()); 365 providerService.linksVanished(dev.id());
352 break; 366 break;
353 default: 367 default:
...@@ -358,16 +372,27 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -358,16 +372,27 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
358 372
359 /** 373 /**
360 * Generates link events using fake links. 374 * Generates link events using fake links.
375 + * TODO: stats collection should be its own thing.
361 */ 376 */
362 private class LinkDriver implements Runnable { 377 private class LinkDriver implements Runnable {
363 - Device myDev; 378 + // List to actually work off of
364 - LinkDriver(Device dev) { 379 + List<LinkDescription> tasks = Lists.newArrayList();
365 - myDev = dev; 380 + float effLoad = 0;
381 + Long counter = 0L;
382 + int next = 0;
383 +
384 + long startTime;
385 +
386 + LinkDriver(Set<LinkDescription> links, int index) {
387 + for (LinkDescription link : links) {
388 + tasks.add(link);
389 + }
390 + startTime = System.currentTimeMillis();
366 } 391 }
367 392
368 @Override 393 @Override
369 public void run() { 394 public void run() {
370 - log.info("Thread started for dev {}", myDev.id()); 395 + log.info("Thread started for links {}", tasks);
371 if (flicker) { 396 if (flicker) {
372 flicker(); 397 flicker();
373 } else { 398 } else {
...@@ -376,53 +401,54 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -376,53 +401,54 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
376 } 401 }
377 402
378 private void flicker() { 403 private void flicker() {
379 - long startTime = System.currentTimeMillis(); 404 + if ((!linkDriver.isShutdown() || !tasks.isEmpty())) {
380 - long countEvent = 0; 405 + log.info("next: {}, count: {}", next, counter);
381 - float effLoad = 0; 406 + if (counter <= CHECK_DURATION * 1000000 / eventRate) {
382 - 407 + if ((counter % 2) == 0) {
383 - while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) { 408 + providerService.linkVanished(tasks.get(next++));
384 - if (!flicker) { 409 + } else {
385 - break; 410 + providerService.linkDetected(tasks.get(next++));
386 } 411 }
387 - //Assuming eventRate is in microsecond unit 412 + if (next == tasks.size()) {
388 - if (countEvent <= CHECK_DURATION * 1000000 / eventRate) { 413 + next = 0;
389 - for (LinkDescription desc : linkDescrs.get(myDev.id())) {
390 - providerService.linkVanished(desc);
391 - countEvent++;
392 - sleepFor(eventRate);
393 - providerService.linkDetected(desc);
394 - countEvent++;
395 - sleepFor(eventRate);
396 } 414 }
415 + counter++;
397 } else { 416 } else {
398 // log in WARN the effective load generation rate in events/sec, every 10 seconds 417 // log in WARN the effective load generation rate in events/sec, every 10 seconds
399 - effLoad = (float) (countEvent * 1000.0 / 418 + effLoad = (float) (counter * 1000.0 / (System
400 - (System.currentTimeMillis() - startTime)); 419 + .currentTimeMillis() - startTime));
401 log.warn("Effective Loading for thread is {} events/second", 420 log.warn("Effective Loading for thread is {} events/second",
402 String.valueOf(effLoad)); 421 String.valueOf(effLoad));
403 - countEvent = 0; 422 + counter = 0L;
404 startTime = System.currentTimeMillis(); 423 startTime = System.currentTimeMillis();
405 } 424 }
425 + linkDriver.schedule(this, eventRate, TimeUnit.MICROSECONDS);
406 } 426 }
407 } 427 }
408 428
409 private void refresh() { 429 private void refresh() {
410 - while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) { 430 + if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
411 - if (flicker) { 431 + log.info("iter {} refresh_links for {} links", counter, linkDescrs.size());
412 - break; 432 +
413 - } 433 + for (LinkDescription desc : linkDescrs) {
414 - for (LinkDescription desc : linkDescrs.get(myDev.id())) {
415 providerService.linkDetected(desc); 434 providerService.linkDetected(desc);
416 - sleepFor(REFRESH_RATE); 435 + log.info("iteration {}, {}", counter, desc);
417 } 436 }
437 + counter++;
438 + linkDriver.schedule(this, REFRESH_RATE, TimeUnit.SECONDS);
418 } 439 }
419 } 440 }
420 441
421 - private void sleepFor(int time) { 442 + public void deviceRemoved(DeviceId did) {
422 - try { 443 + synchronized (tasks) {
423 - TimeUnit.MICROSECONDS.sleep(time); 444 + Iterator<LinkDescription> it = tasks.iterator();
424 - } catch (InterruptedException e) { 445 + while (it.hasNext()) {
425 - log.warn(String.valueOf(e)); 446 + LinkDescription ld = it.next();
447 + if (did.equals(ld.dst().deviceId())
448 + || (did.equals(ld.src().deviceId()))) {
449 + it.remove();
450 + }
451 + }
426 } 452 }
427 } 453 }
428 } 454 }
......