Committed by
Ray Milkey
In this commit:
Bug fix where filtering objectives are not installed due to available ports becoming enabled later. Bug fix where flow objective store had no listener for notifications from drivers across multiple instances of the controller. NPE fix in ofdpa driver for non-existing groups. Preventing ofdpa driver from sending spurious pass notification to app. Incrementing retry filter timer from 1 to 5 secs in default routing handler. Made several debug messages clearer. Change-Id: I828671ee4c8bcfe03c946d051e1d1aac9d8f68dd
Showing
5 changed files
with
129 additions
and
35 deletions
... | @@ -47,7 +47,7 @@ import static com.google.common.base.Preconditions.checkNotNull; | ... | @@ -47,7 +47,7 @@ import static com.google.common.base.Preconditions.checkNotNull; |
47 | * routing rule population. | 47 | * routing rule population. |
48 | */ | 48 | */ |
49 | public class DefaultRoutingHandler { | 49 | public class DefaultRoutingHandler { |
50 | - private static final int MAX_RETRY_ATTEMPTS = 5; | 50 | + private static final int MAX_RETRY_ATTEMPTS = 25; |
51 | private static final String ECMPSPG_MISSING = "ECMP shortest path graph not found"; | 51 | private static final String ECMPSPG_MISSING = "ECMP shortest path graph not found"; |
52 | private static Logger log = LoggerFactory.getLogger(DefaultRoutingHandler.class); | 52 | private static Logger log = LoggerFactory.getLogger(DefaultRoutingHandler.class); |
53 | 53 | ||
... | @@ -212,11 +212,11 @@ public class DefaultRoutingHandler { | ... | @@ -212,11 +212,11 @@ public class DefaultRoutingHandler { |
212 | log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", link.get(0)); | 212 | log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", link.get(0)); |
213 | EcmpShortestPathGraph ecmpSpg = new EcmpShortestPathGraph(link.get(0), srManager); | 213 | EcmpShortestPathGraph ecmpSpg = new EcmpShortestPathGraph(link.get(0), srManager); |
214 | if (populateEcmpRoutingRules(link.get(0), ecmpSpg, ImmutableSet.of())) { | 214 | if (populateEcmpRoutingRules(link.get(0), ecmpSpg, ImmutableSet.of())) { |
215 | - log.debug("Populating flow rules from {} to all is successful", | 215 | + log.debug("Populating flow rules from all to dest:{} is successful", |
216 | link.get(0)); | 216 | link.get(0)); |
217 | currentEcmpSpgMap.put(link.get(0), ecmpSpg); | 217 | currentEcmpSpgMap.put(link.get(0), ecmpSpg); |
218 | } else { | 218 | } else { |
219 | - log.warn("Failed to populate the flow rules from {} to all", link.get(0)); | 219 | + log.warn("Failed to populate the flow rules from all to dest:{}", link.get(0)); |
220 | return false; | 220 | return false; |
221 | } | 221 | } |
222 | } else { | 222 | } else { |
... | @@ -463,9 +463,9 @@ public class DefaultRoutingHandler { | ... | @@ -463,9 +463,9 @@ public class DefaultRoutingHandler { |
463 | /** | 463 | /** |
464 | * Populate ECMP rules for subnets from target to destination via nexthops. | 464 | * Populate ECMP rules for subnets from target to destination via nexthops. |
465 | * | 465 | * |
466 | - * @param targetSw Device ID of target switch | 466 | + * @param targetSw Device ID of target switch in which rules will be programmed |
467 | - * @param destSw Device ID of destination switch | 467 | + * @param destSw Device ID of final destination switch to which the rules will forward |
468 | - * @param nextHops List of next hops | 468 | + * @param nextHops List of next hops via which destSw will be reached |
469 | * @param subnets Subnets to be populated. If empty, populate all configured subnets. | 469 | * @param subnets Subnets to be populated. If empty, populate all configured subnets. |
470 | * @return true if succeed | 470 | * @return true if succeed |
471 | */ | 471 | */ |
... | @@ -647,6 +647,8 @@ public class DefaultRoutingHandler { | ... | @@ -647,6 +647,8 @@ public class DefaultRoutingHandler { |
647 | 647 | ||
648 | @Override | 648 | @Override |
649 | public void run() { | 649 | public void run() { |
650 | + log.info("RETRY FILTER ATTEMPT# {} for dev:{}", | ||
651 | + MAX_RETRY_ATTEMPTS - attempts, devId); | ||
650 | boolean success = rulePopulator.populateRouterMacVlanFilters(devId); | 652 | boolean success = rulePopulator.populateRouterMacVlanFilters(devId); |
651 | if (!success && --attempts > 0) { | 653 | if (!success && --attempts > 0) { |
652 | executorService.schedule(this, 200, TimeUnit.MILLISECONDS); | 654 | executorService.schedule(this, 200, TimeUnit.MILLISECONDS); | ... | ... |
... | @@ -244,7 +244,8 @@ public class RoutingRulePopulator { | ... | @@ -244,7 +244,8 @@ public class RoutingRulePopulator { |
244 | } | 244 | } |
245 | 245 | ||
246 | /** | 246 | /** |
247 | - * Populates IP flow rules for the router IP address. | 247 | + * Populates IP flow rules for an IP prefix in the target device. The prefix |
248 | + * is reachable via destination device. | ||
248 | * | 249 | * |
249 | * @param deviceId target device ID to set the rules | 250 | * @param deviceId target device ID to set the rules |
250 | * @param ipPrefix the IP address of the destination router | 251 | * @param ipPrefix the IP address of the destination router |
... | @@ -312,9 +313,11 @@ public class RoutingRulePopulator { | ... | @@ -312,9 +313,11 @@ public class RoutingRulePopulator { |
312 | ipPrefix, | 313 | ipPrefix, |
313 | deviceId); | 314 | deviceId); |
314 | ObjectiveContext context = new DefaultObjectiveContext( | 315 | ObjectiveContext context = new DefaultObjectiveContext( |
315 | - (objective) -> log.debug("IP rule for router {} populated", ipPrefix), | 316 | + (objective) -> log.debug("IP rule for router {} populated in dev:{}", |
317 | + ipPrefix, deviceId), | ||
316 | (objective, error) -> | 318 | (objective, error) -> |
317 | - log.warn("Failed to populate IP rule for router {}: {}", ipPrefix, error)); | 319 | + log.warn("Failed to populate IP rule for router {}: {} in dev:{}", |
320 | + ipPrefix, error, deviceId)); | ||
318 | srManager.flowObjectiveService.forward(deviceId, fwdBuilder.add(context)); | 321 | srManager.flowObjectiveService.forward(deviceId, fwdBuilder.add(context)); |
319 | rulePopulationCounter.incrementAndGet(); | 322 | rulePopulationCounter.incrementAndGet(); |
320 | 323 | ||
... | @@ -356,14 +359,15 @@ public class RoutingRulePopulator { | ... | @@ -356,14 +359,15 @@ public class RoutingRulePopulator { |
356 | } | 359 | } |
357 | 360 | ||
358 | /** | 361 | /** |
359 | - * Populates MPLS flow rules to all routers. | 362 | + * Populates MPLS flow rules in the target device to point towards the |
363 | + * destination device. | ||
360 | * | 364 | * |
361 | - * @param deviceId target device ID of the switch to set the rules | 365 | + * @param targetSwId target device ID of the switch to set the rules |
362 | * @param destSwId destination switch device ID | 366 | * @param destSwId destination switch device ID |
363 | * @param nextHops next hops switch ID list | 367 | * @param nextHops next hops switch ID list |
364 | * @return true if all rules are set successfully, false otherwise | 368 | * @return true if all rules are set successfully, false otherwise |
365 | */ | 369 | */ |
366 | - public boolean populateMplsRule(DeviceId deviceId, DeviceId destSwId, | 370 | + public boolean populateMplsRule(DeviceId targetSwId, DeviceId destSwId, |
367 | Set<DeviceId> nextHops) { | 371 | Set<DeviceId> nextHops) { |
368 | int segmentId; | 372 | int segmentId; |
369 | try { | 373 | try { |
... | @@ -392,11 +396,11 @@ public class RoutingRulePopulator { | ... | @@ -392,11 +396,11 @@ public class RoutingRulePopulator { |
392 | // If the next hop is the destination router for the segment, do pop | 396 | // If the next hop is the destination router for the segment, do pop |
393 | if (nextHops.size() == 1 && destSwId.equals(nextHops.toArray()[0])) { | 397 | if (nextHops.size() == 1 && destSwId.equals(nextHops.toArray()[0])) { |
394 | log.debug("populateMplsRule: Installing MPLS forwarding objective for " | 398 | log.debug("populateMplsRule: Installing MPLS forwarding objective for " |
395 | - + "label {} in switch {} with pop", segmentId, deviceId); | 399 | + + "label {} in switch {} with pop", segmentId, targetSwId); |
396 | 400 | ||
397 | // bos pop case (php) | 401 | // bos pop case (php) |
398 | ForwardingObjective.Builder fwdObjBosBuilder = | 402 | ForwardingObjective.Builder fwdObjBosBuilder = |
399 | - getMplsForwardingObjective(deviceId, | 403 | + getMplsForwardingObjective(targetSwId, |
400 | nextHops, | 404 | nextHops, |
401 | true, | 405 | true, |
402 | true, | 406 | true, |
... | @@ -416,13 +420,13 @@ public class RoutingRulePopulator { | ... | @@ -416,13 +420,13 @@ public class RoutingRulePopulator { |
416 | } else { | 420 | } else { |
417 | // next hop is not destination, SR CONTINUE case (swap with self) | 421 | // next hop is not destination, SR CONTINUE case (swap with self) |
418 | log.debug("Installing MPLS forwarding objective for " | 422 | log.debug("Installing MPLS forwarding objective for " |
419 | - + "label {} in switch {} without pop", segmentId, deviceId); | 423 | + + "label {} in switch {} without pop", segmentId, targetSwId); |
420 | 424 | ||
421 | // continue case with bos - this does get triggered in edge routers | 425 | // continue case with bos - this does get triggered in edge routers |
422 | // and in core routers - driver can handle depending on availability | 426 | // and in core routers - driver can handle depending on availability |
423 | // of MPLS ECMP or not | 427 | // of MPLS ECMP or not |
424 | ForwardingObjective.Builder fwdObjBosBuilder = | 428 | ForwardingObjective.Builder fwdObjBosBuilder = |
425 | - getMplsForwardingObjective(deviceId, | 429 | + getMplsForwardingObjective(targetSwId, |
426 | nextHops, | 430 | nextHops, |
427 | false, | 431 | false, |
428 | true, | 432 | true, |
... | @@ -441,17 +445,23 @@ public class RoutingRulePopulator { | ... | @@ -441,17 +445,23 @@ public class RoutingRulePopulator { |
441 | false); */ | 445 | false); */ |
442 | 446 | ||
443 | } | 447 | } |
444 | - | 448 | + // XXX when other cases above are implemented check for validity of |
449 | + // debug messages below | ||
445 | for (ForwardingObjective.Builder fwdObjBuilder : fwdObjBuilders) { | 450 | for (ForwardingObjective.Builder fwdObjBuilder : fwdObjBuilders) { |
446 | ((Builder) ((Builder) fwdObjBuilder.fromApp(srManager.appId) | 451 | ((Builder) ((Builder) fwdObjBuilder.fromApp(srManager.appId) |
447 | .makePermanent()).withSelector(selector) | 452 | .makePermanent()).withSelector(selector) |
448 | .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)) | 453 | .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)) |
449 | .withFlag(ForwardingObjective.Flag.SPECIFIC); | 454 | .withFlag(ForwardingObjective.Flag.SPECIFIC); |
450 | ObjectiveContext context = new DefaultObjectiveContext( | 455 | ObjectiveContext context = new DefaultObjectiveContext( |
451 | - (objective) -> log.debug("MPLS rule for SID {} populated", segmentId), | 456 | + (objective) -> log.debug("MPLS rule {} for SID {} populated in dev:{} ", |
457 | + objective.id(), segmentId, targetSwId), | ||
452 | (objective, error) -> | 458 | (objective, error) -> |
453 | - log.warn("Failed to populate MPLS rule for SID {}: {}", segmentId, error)); | 459 | + log.warn("Failed to populate MPLS rule {} for SID {}: {} in dev:{}", |
454 | - srManager.flowObjectiveService.forward(deviceId, fwdObjBuilder.add(context)); | 460 | + objective.id(), segmentId, error, targetSwId)); |
461 | + ForwardingObjective fob = fwdObjBuilder.add(context); | ||
462 | + log.debug("Sending MPLS fwd obj {} for SID {}-> next {} in sw: {}", | ||
463 | + fob.id(), segmentId, fob.nextId(), targetSwId); | ||
464 | + srManager.flowObjectiveService.forward(targetSwId, fob); | ||
455 | rulePopulationCounter.incrementAndGet(); | 465 | rulePopulationCounter.incrementAndGet(); |
456 | } | 466 | } |
457 | 467 | ||
... | @@ -493,13 +503,16 @@ public class RoutingRulePopulator { | ... | @@ -493,13 +503,16 @@ public class RoutingRulePopulator { |
493 | // packets will be hashed or not. | 503 | // packets will be hashed or not. |
494 | fwdBuilder.withTreatment(tbuilder.build()); | 504 | fwdBuilder.withTreatment(tbuilder.build()); |
495 | NeighborSet ns = new NeighborSet(nextHops); | 505 | NeighborSet ns = new NeighborSet(nextHops); |
496 | - log.debug("Trying to get a nextObjid for mpls rule on device:{} to ns:{}", | 506 | + log.debug("Trying to get a nextObjId for mpls rule on device:{} to ns:{}", |
497 | deviceId, ns); | 507 | deviceId, ns); |
498 | 508 | ||
499 | int nextId = srManager.getNextObjectiveId(deviceId, ns, meta); | 509 | int nextId = srManager.getNextObjectiveId(deviceId, ns, meta); |
500 | if (nextId <= 0) { | 510 | if (nextId <= 0) { |
501 | log.warn("No next objective in {} for ns: {}", deviceId, ns); | 511 | log.warn("No next objective in {} for ns: {}", deviceId, ns); |
502 | return null; | 512 | return null; |
513 | + } else { | ||
514 | + log.debug("nextObjId found:{} for mpls rule on device:{} to ns:{}", | ||
515 | + nextId, deviceId, ns); | ||
503 | } | 516 | } |
504 | 517 | ||
505 | fwdBuilder.nextStep(nextId); | 518 | fwdBuilder.nextStep(nextId); |
... | @@ -537,14 +550,20 @@ public class RoutingRulePopulator { | ... | @@ -537,14 +550,20 @@ public class RoutingRulePopulator { |
537 | deviceId); | 550 | deviceId); |
538 | return false; | 551 | return false; |
539 | } | 552 | } |
540 | - | 553 | + int disabledPorts = 0, suppressedPorts = 0, filteredPorts = 0; |
541 | for (Port port : devPorts) { | 554 | for (Port port : devPorts) { |
542 | ConnectPoint connectPoint = new ConnectPoint(deviceId, port.number()); | 555 | ConnectPoint connectPoint = new ConnectPoint(deviceId, port.number()); |
543 | // TODO: Handles dynamic port events when we are ready for dynamic config | 556 | // TODO: Handles dynamic port events when we are ready for dynamic config |
544 | SegmentRoutingAppConfig appConfig = srManager.cfgService | 557 | SegmentRoutingAppConfig appConfig = srManager.cfgService |
545 | .getConfig(srManager.appId, SegmentRoutingAppConfig.class); | 558 | .getConfig(srManager.appId, SegmentRoutingAppConfig.class); |
546 | - if ((appConfig == null || !appConfig.suppressSubnet().contains(connectPoint)) && | 559 | + if (!port.isEnabled()) { |
547 | - port.isEnabled()) { | 560 | + disabledPorts++; |
561 | + continue; | ||
562 | + } | ||
563 | + if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) { | ||
564 | + suppressedPorts++; | ||
565 | + continue; | ||
566 | + } | ||
548 | Ip4Prefix portSubnet = config.getPortSubnet(deviceId, port.number()); | 567 | Ip4Prefix portSubnet = config.getPortSubnet(deviceId, port.number()); |
549 | VlanId assignedVlan = (portSubnet == null) | 568 | VlanId assignedVlan = (portSubnet == null) |
550 | ? VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET) | 569 | ? VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET) |
... | @@ -563,14 +582,22 @@ public class RoutingRulePopulator { | ... | @@ -563,14 +582,22 @@ public class RoutingRulePopulator { |
563 | } | 582 | } |
564 | fob.permit().fromApp(srManager.appId); | 583 | fob.permit().fromApp(srManager.appId); |
565 | log.debug("Sending filtering objective for dev/port:{}/{}", deviceId, port); | 584 | log.debug("Sending filtering objective for dev/port:{}/{}", deviceId, port); |
585 | + filteredPorts++; | ||
566 | ObjectiveContext context = new DefaultObjectiveContext( | 586 | ObjectiveContext context = new DefaultObjectiveContext( |
567 | (objective) -> log.debug("Filter for {} populated", connectPoint), | 587 | (objective) -> log.debug("Filter for {} populated", connectPoint), |
568 | (objective, error) -> | 588 | (objective, error) -> |
569 | log.warn("Failed to populate filter for {}: {}", connectPoint, error)); | 589 | log.warn("Failed to populate filter for {}: {}", connectPoint, error)); |
570 | srManager.flowObjectiveService.filter(deviceId, fob.add(context)); | 590 | srManager.flowObjectiveService.filter(deviceId, fob.add(context)); |
571 | } | 591 | } |
572 | - } | 592 | + log.info("Filtering on dev:{}, disabledPorts:{}, suppressedPorts:{}, filteredPorts:{}", |
573 | - return true; | 593 | + deviceId, disabledPorts, suppressedPorts, filteredPorts); |
594 | + // XXX With this check, there is a chance that not all the ports that | ||
595 | + // should be filtered actually get filtered as long as one of them does. | ||
596 | + // Note there is no PORT_UPDATED event that makes the port go from disabled | ||
597 | + // to enabled state, because the ports comes enabled from the switch. | ||
598 | + // Check ONOS core, where the port becoming available and being declared | ||
599 | + // enabled is possibly not atomic. | ||
600 | + return (filteredPorts > 0) ? true : false; | ||
574 | } | 601 | } |
575 | 602 | ||
576 | /** | 603 | /** | ... | ... |
... | @@ -239,7 +239,8 @@ public class FlowObjectiveManager implements FlowObjectiveService { | ... | @@ -239,7 +239,8 @@ public class FlowObjectiveManager implements FlowObjectiveService { |
239 | private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) { | 239 | private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) { |
240 | if (fwd.nextId() != null && | 240 | if (fwd.nextId() != null && |
241 | flowObjectiveStore.getNextGroup(fwd.nextId()) == null) { | 241 | flowObjectiveStore.getNextGroup(fwd.nextId()) == null) { |
242 | - log.trace("Queuing forwarding objective for nextId {}", fwd.nextId()); | 242 | + log.debug("Queuing forwarding objective {} for nextId {} meant for device {}", |
243 | + fwd.id(), fwd.nextId(), deviceId); | ||
243 | // TODO: change to computeIfAbsent | 244 | // TODO: change to computeIfAbsent |
244 | Set<PendingNext> newset = Collections.newSetFromMap( | 245 | Set<PendingNext> newset = Collections.newSetFromMap( |
245 | new ConcurrentHashMap<PendingNext, Boolean>()); | 246 | new ConcurrentHashMap<PendingNext, Boolean>()); |
... | @@ -398,7 +399,7 @@ public class FlowObjectiveManager implements FlowObjectiveService { | ... | @@ -398,7 +399,7 @@ public class FlowObjectiveManager implements FlowObjectiveService { |
398 | Set<PendingNext> pending = pendingForwards.remove(event.subject()); | 399 | Set<PendingNext> pending = pendingForwards.remove(event.subject()); |
399 | 400 | ||
400 | if (pending == null) { | 401 | if (pending == null) { |
401 | - log.warn("Nothing pending for this obj event {}", event); | 402 | + log.debug("Nothing pending for this obj event {}", event); |
402 | return; | 403 | return; |
403 | } | 404 | } |
404 | 405 | ||
... | @@ -457,7 +458,7 @@ public class FlowObjectiveManager implements FlowObjectiveService { | ... | @@ -457,7 +458,7 @@ public class FlowObjectiveManager implements FlowObjectiveService { |
457 | public List<String> getNextMappings() { | 458 | public List<String> getNextMappings() { |
458 | List<String> mappings = new ArrayList<>(); | 459 | List<String> mappings = new ArrayList<>(); |
459 | Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups(); | 460 | Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups(); |
460 | - // XXX if the NextGroup upon decoding stored info of the deviceId | 461 | + // XXX if the NextGroup after de-serialization actually stored info of the deviceId |
461 | // then info on any nextObj could be retrieved from one controller instance. | 462 | // then info on any nextObj could be retrieved from one controller instance. |
462 | // Right now the drivers on one instance can only fetch for next-ids that came | 463 | // Right now the drivers on one instance can only fetch for next-ids that came |
463 | // to them. | 464 | // to them. | ... | ... |
... | @@ -30,15 +30,22 @@ import org.onosproject.net.flowobjective.ObjectiveEvent; | ... | @@ -30,15 +30,22 @@ import org.onosproject.net.flowobjective.ObjectiveEvent; |
30 | import org.onosproject.store.AbstractStore; | 30 | import org.onosproject.store.AbstractStore; |
31 | import org.onosproject.store.service.AtomicCounter; | 31 | import org.onosproject.store.service.AtomicCounter; |
32 | import org.onosproject.store.service.ConsistentMap; | 32 | import org.onosproject.store.service.ConsistentMap; |
33 | +import org.onosproject.store.service.MapEvent; | ||
34 | +import org.onosproject.store.service.MapEventListener; | ||
33 | import org.onosproject.store.service.Serializer; | 35 | import org.onosproject.store.service.Serializer; |
34 | import org.onosproject.store.service.StorageService; | 36 | import org.onosproject.store.service.StorageService; |
35 | import org.onosproject.store.service.Versioned; | 37 | import org.onosproject.store.service.Versioned; |
36 | import org.slf4j.Logger; | 38 | import org.slf4j.Logger; |
37 | 39 | ||
40 | +import static org.onlab.util.Tools.groupedThreads; | ||
38 | import static org.slf4j.LoggerFactory.getLogger; | 41 | import static org.slf4j.LoggerFactory.getLogger; |
39 | 42 | ||
40 | import java.util.HashMap; | 43 | import java.util.HashMap; |
41 | import java.util.Map; | 44 | import java.util.Map; |
45 | +import java.util.concurrent.BlockingQueue; | ||
46 | +import java.util.concurrent.ExecutorService; | ||
47 | +import java.util.concurrent.Executors; | ||
48 | +import java.util.concurrent.LinkedBlockingQueue; | ||
42 | 49 | ||
43 | /** | 50 | /** |
44 | * Manages the inventory of created next groups. | 51 | * Manages the inventory of created next groups. |
... | @@ -57,9 +64,16 @@ public class DistributedFlowObjectiveStore | ... | @@ -57,9 +64,16 @@ public class DistributedFlowObjectiveStore |
57 | protected StorageService storageService; | 64 | protected StorageService storageService; |
58 | 65 | ||
59 | private AtomicCounter nextIds; | 66 | private AtomicCounter nextIds; |
67 | + private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener(); | ||
68 | + // event queue to separate map-listener threads from event-handler threads (tpool) | ||
69 | + private BlockingQueue<ObjectiveEvent> eventQ; | ||
70 | + private ExecutorService tpool; | ||
60 | 71 | ||
61 | @Activate | 72 | @Activate |
62 | public void activate() { | 73 | public void activate() { |
74 | + tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log)); | ||
75 | + eventQ = new LinkedBlockingQueue<ObjectiveEvent>(); | ||
76 | + tpool.execute(new FlowObjectiveNotifier()); | ||
63 | nextGroups = storageService.<Integer, byte[]>consistentMapBuilder() | 77 | nextGroups = storageService.<Integer, byte[]>consistentMapBuilder() |
64 | .withName("flowobjective-groups") | 78 | .withName("flowobjective-groups") |
65 | .withSerializer(Serializer.using( | 79 | .withSerializer(Serializer.using( |
... | @@ -68,7 +82,7 @@ public class DistributedFlowObjectiveStore | ... | @@ -68,7 +82,7 @@ public class DistributedFlowObjectiveStore |
68 | .register(Versioned.class) | 82 | .register(Versioned.class) |
69 | .build("DistributedFlowObjectiveStore"))) | 83 | .build("DistributedFlowObjectiveStore"))) |
70 | .build(); | 84 | .build(); |
71 | - | 85 | + nextGroups.addListener(mapListener); |
72 | nextIds = storageService.getAtomicCounter("next-objective-counter"); | 86 | nextIds = storageService.getAtomicCounter("next-objective-counter"); |
73 | log.info("Started"); | 87 | log.info("Started"); |
74 | } | 88 | } |
... | @@ -76,6 +90,7 @@ public class DistributedFlowObjectiveStore | ... | @@ -76,6 +90,7 @@ public class DistributedFlowObjectiveStore |
76 | 90 | ||
77 | @Deactivate | 91 | @Deactivate |
78 | public void deactivate() { | 92 | public void deactivate() { |
93 | + tpool.shutdown(); | ||
79 | log.info("Stopped"); | 94 | log.info("Stopped"); |
80 | } | 95 | } |
81 | 96 | ||
... | @@ -120,4 +135,37 @@ public class DistributedFlowObjectiveStore | ... | @@ -120,4 +135,37 @@ public class DistributedFlowObjectiveStore |
120 | public int allocateNextId() { | 135 | public int allocateNextId() { |
121 | return (int) nextIds.incrementAndGet(); | 136 | return (int) nextIds.incrementAndGet(); |
122 | } | 137 | } |
138 | + | ||
139 | + private class FlowObjectiveNotifier implements Runnable { | ||
140 | + @Override | ||
141 | + public void run() { | ||
142 | + try { | ||
143 | + while (!Thread.currentThread().isInterrupted()) { | ||
144 | + notifyDelegate(eventQ.take()); | ||
145 | + } | ||
146 | + } catch (InterruptedException ex) { | ||
147 | + Thread.currentThread().interrupt(); | ||
148 | + } | ||
149 | + } | ||
150 | + } | ||
151 | + | ||
152 | + private class NextGroupListener implements MapEventListener<Integer, byte[]> { | ||
153 | + @Override | ||
154 | + public void event(MapEvent<Integer, byte[]> event) { | ||
155 | + switch (event.type()) { | ||
156 | + case INSERT: | ||
157 | + eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key())); | ||
158 | + break; | ||
159 | + case REMOVE: | ||
160 | + eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key())); | ||
161 | + break; | ||
162 | + case UPDATE: | ||
163 | + // TODO Introduce UPDATE ObjectiveEvent when the map is being updated | ||
164 | + break; | ||
165 | + default: | ||
166 | + break; | ||
167 | + } | ||
168 | + } | ||
169 | + } | ||
170 | + | ||
123 | } | 171 | } | ... | ... |
... | @@ -182,6 +182,12 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline | ... | @@ -182,6 +182,12 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline |
182 | FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder(); | 182 | FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder(); |
183 | 183 | ||
184 | rules = processForward(fwd); | 184 | rules = processForward(fwd); |
185 | + if (rules == null || rules.isEmpty()) { | ||
186 | + // Assumes fail message has already been generated to the objective | ||
187 | + // context. Returning here prevents spurious pass message to be | ||
188 | + // generated by FlowRule service for empty flowOps. | ||
189 | + return; | ||
190 | + } | ||
185 | switch (fwd.op()) { | 191 | switch (fwd.op()) { |
186 | case ADD: | 192 | case ADD: |
187 | rules.stream() | 193 | rules.stream() |
... | @@ -748,7 +754,7 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline | ... | @@ -748,7 +754,7 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline |
748 | * returned if there is an issue in processing the objective. | 754 | * returned if there is an issue in processing the objective. |
749 | */ | 755 | */ |
750 | protected Collection<FlowRule> processSpecific(ForwardingObjective fwd) { | 756 | protected Collection<FlowRule> processSpecific(ForwardingObjective fwd) { |
751 | - log.trace("Processing specific fwd objective:{} in dev:{} with next:{}", | 757 | + log.debug("Processing specific fwd objective:{} in dev:{} with next:{}", |
752 | fwd.id(), deviceId, fwd.nextId()); | 758 | fwd.id(), deviceId, fwd.nextId()); |
753 | boolean isEthTypeObj = isSupportedEthTypeObjective(fwd); | 759 | boolean isEthTypeObj = isSupportedEthTypeObjective(fwd); |
754 | boolean isEthDstObj = isSupportedEthDstObjective(fwd); | 760 | boolean isEthDstObj = isSupportedEthDstObjective(fwd); |
... | @@ -885,8 +891,8 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline | ... | @@ -885,8 +891,8 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline |
885 | if (fwd.nextId() != null) { | 891 | if (fwd.nextId() != null) { |
886 | if (forTableId == MPLS_TABLE_1 && !popMpls) { | 892 | if (forTableId == MPLS_TABLE_1 && !popMpls) { |
887 | log.warn("SR CONTINUE case cannot be handled as MPLS ECMP " | 893 | log.warn("SR CONTINUE case cannot be handled as MPLS ECMP " |
888 | - + "is not implemented in OF-DPA yet. Aborting this flow " | 894 | + + "is not implemented in OF-DPA yet. Aborting this flow {} -> next:{}" |
889 | - + "in this device {}", deviceId); | 895 | + + "in this device {}", fwd.id(), fwd.nextId(), deviceId); |
890 | // XXX We could convert to forwarding to a single-port, via a | 896 | // XXX We could convert to forwarding to a single-port, via a |
891 | // MPLS interface, or a MPLS SWAP (with-same) but that would | 897 | // MPLS interface, or a MPLS SWAP (with-same) but that would |
892 | // have to be handled in the next-objective. Also the pop-mpls | 898 | // have to be handled in the next-objective. Also the pop-mpls |
... | @@ -907,6 +913,11 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline | ... | @@ -907,6 +913,11 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline |
907 | return Collections.emptySet(); | 913 | return Collections.emptySet(); |
908 | } | 914 | } |
909 | tb.deferred().group(group.id()); | 915 | tb.deferred().group(group.id()); |
916 | + } else { | ||
917 | + log.warn("Cannot find group for nextId:{} in dev:{}. Aborting fwd:{}", | ||
918 | + fwd.nextId(), deviceId, fwd.id()); | ||
919 | + fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED); | ||
920 | + return Collections.emptySet(); | ||
910 | } | 921 | } |
911 | } | 922 | } |
912 | tb.transition(ACL_TABLE); | 923 | tb.transition(ACL_TABLE); |
... | @@ -1063,7 +1074,7 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline | ... | @@ -1063,7 +1074,7 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline |
1063 | for (GroupKey gk : gkd) { | 1074 | for (GroupKey gk : gkd) { |
1064 | Group g = groupService.getGroup(deviceId, gk); | 1075 | Group g = groupService.getGroup(deviceId, gk); |
1065 | if (g == null) { | 1076 | if (g == null) { |
1066 | - gchain.append(" ERROR").append(" -->"); | 1077 | + gchain.append(" NoGrp").append(" -->"); |
1067 | continue; | 1078 | continue; |
1068 | } | 1079 | } |
1069 | gchain.append(" 0x").append(Integer.toHexString(g.id().id())) | 1080 | gchain.append(" 0x").append(Integer.toHexString(g.id().id())) |
... | @@ -1071,7 +1082,12 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline | ... | @@ -1071,7 +1082,12 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline |
1071 | lastGroup = g; | 1082 | lastGroup = g; |
1072 | } | 1083 | } |
1073 | // add port information for last group in group-chain | 1084 | // add port information for last group in group-chain |
1074 | - for (Instruction i: lastGroup.buckets().buckets().get(0).treatment().allInstructions()) { | 1085 | + List<Instruction> lastGroupIns = new ArrayList<Instruction>(); |
1086 | + if (gchain != null) { | ||
1087 | + lastGroupIns = lastGroup.buckets().buckets().get(0) | ||
1088 | + .treatment().allInstructions(); | ||
1089 | + } | ||
1090 | + for (Instruction i: lastGroupIns) { | ||
1075 | if (i instanceof OutputInstruction) { | 1091 | if (i instanceof OutputInstruction) { |
1076 | gchain.append(" port:").append(((OutputInstruction) i).port()); | 1092 | gchain.append(" port:").append(((OutputInstruction) i).port()); |
1077 | } | 1093 | } | ... | ... |
-
Please register or login to post a comment