ke han
Committed by Gerrit Code Review

add pipeline for nokia olt device

Change-Id: I60f2988910eea5f9ffdfd14e7d47863af63b2691
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.driver.pipeline;
17 +
18 +import com.google.common.cache.Cache;
19 +import com.google.common.cache.CacheBuilder;
20 +import com.google.common.cache.RemovalCause;
21 +import com.google.common.cache.RemovalNotification;
22 +import com.google.common.collect.Lists;
23 +import org.apache.commons.lang3.tuple.ImmutablePair;
24 +import org.apache.commons.lang3.tuple.Pair;
25 +import org.onlab.osgi.ServiceDirectory;
26 +import org.onlab.packet.EthType;
27 +import org.onlab.packet.IPv4;
28 +import org.onlab.packet.VlanId;
29 +import org.onlab.util.KryoNamespace;
30 +import org.onosproject.core.ApplicationId;
31 +import org.onosproject.core.CoreService;
32 +import org.onosproject.net.DeviceId;
33 +import org.onosproject.net.PortNumber;
34 +import org.onosproject.net.behaviour.NextGroup;
35 +import org.onosproject.net.behaviour.Pipeliner;
36 +import org.onosproject.net.behaviour.PipelinerContext;
37 +import org.onosproject.net.driver.AbstractHandlerBehaviour;
38 +import org.onosproject.net.flow.DefaultFlowRule;
39 +import org.onosproject.net.flow.DefaultTrafficSelector;
40 +import org.onosproject.net.flow.DefaultTrafficTreatment;
41 +import org.onosproject.net.flow.FlowRule;
42 +import org.onosproject.net.flow.FlowRuleOperations;
43 +import org.onosproject.net.flow.FlowRuleOperationsContext;
44 +import org.onosproject.net.flow.FlowRuleService;
45 +import org.onosproject.net.flow.TrafficSelector;
46 +import org.onosproject.net.flow.TrafficTreatment;
47 +import org.onosproject.net.flow.criteria.Criteria;
48 +import org.onosproject.net.flow.criteria.Criterion;
49 +import org.onosproject.net.flow.criteria.EthTypeCriterion;
50 +import org.onosproject.net.flow.criteria.IPCriterion;
51 +import org.onosproject.net.flow.criteria.IPProtocolCriterion;
52 +import org.onosproject.net.flow.criteria.PortCriterion;
53 +import org.onosproject.net.flow.criteria.VlanIdCriterion;
54 +import org.onosproject.net.flow.instructions.Instruction;
55 +import org.onosproject.net.flow.instructions.Instructions;
56 +import org.onosproject.net.flow.instructions.L2ModificationInstruction;
57 +import org.onosproject.net.flowobjective.FilteringObjective;
58 +import org.onosproject.net.flowobjective.FlowObjectiveStore;
59 +import org.onosproject.net.flowobjective.ForwardingObjective;
60 +import org.onosproject.net.flowobjective.NextObjective;
61 +import org.onosproject.net.flowobjective.Objective;
62 +import org.onosproject.net.flowobjective.ObjectiveError;
63 +import org.onosproject.net.group.DefaultGroupBucket;
64 +import org.onosproject.net.group.DefaultGroupDescription;
65 +import org.onosproject.net.group.DefaultGroupKey;
66 +import org.onosproject.net.group.Group;
67 +import org.onosproject.net.group.GroupBucket;
68 +import org.onosproject.net.group.GroupBuckets;
69 +import org.onosproject.net.group.GroupDescription;
70 +import org.onosproject.net.group.GroupEvent;
71 +import org.onosproject.net.group.GroupKey;
72 +import org.onosproject.net.group.GroupListener;
73 +import org.onosproject.net.group.GroupService;
74 +import org.onosproject.store.serializers.KryoNamespaces;
75 +import org.onosproject.store.service.StorageService;
76 +import org.slf4j.Logger;
77 +
78 +import java.util.Collection;
79 +import java.util.Collections;
80 +import java.util.List;
81 +import java.util.Optional;
82 +import java.util.concurrent.TimeUnit;
83 +import java.util.stream.Collectors;
84 +import java.util.Iterator;
85 +
86 +
87 +import static org.slf4j.LoggerFactory.getLogger;
88 +
89 +/**
90 + * Pipeliner for OLT device.
91 + */
92 +
93 +public class NokiaOltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
94 +
95 + private static final Integer QQ_TABLE = 1;
96 + private static final short MCAST_VLAN = 4000;
97 + private static final String OLTCOOKIES = "olt-cookies-must-be-unique";
98 + private final Logger log = getLogger(getClass());
99 +
100 + private ServiceDirectory serviceDirectory;
101 + private FlowRuleService flowRuleService;
102 + private GroupService groupService;
103 + private CoreService coreService;
104 + private StorageService storageService;
105 +
106 + private DeviceId deviceId;
107 + private ApplicationId appId;
108 +
109 +
110 + protected FlowObjectiveStore flowObjectiveStore;
111 +
112 + private Cache<GroupKey, NextObjective> pendingGroups;
113 +
114 + protected static KryoNamespace appKryo = new KryoNamespace.Builder()
115 + .register(KryoNamespaces.API)
116 + .register(GroupKey.class)
117 + .register(DefaultGroupKey.class)
118 + .register(OLTPipelineGroup.class)
119 + .build("OltPipeline");
120 + @Override
121 + public void init(DeviceId deviceId, PipelinerContext context) {
122 + log.debug("Initiate OLT pipeline");
123 + this.serviceDirectory = context.directory();
124 + this.deviceId = deviceId;
125 +
126 + flowRuleService = serviceDirectory.get(FlowRuleService.class);
127 + coreService = serviceDirectory.get(CoreService.class);
128 + groupService = serviceDirectory.get(GroupService.class);
129 + flowObjectiveStore = context.store();
130 + storageService = serviceDirectory.get(StorageService.class);
131 +
132 + appId = coreService.registerApplication(
133 + "org.onosproject.driver.OLTPipeline");
134 +
135 +
136 + pendingGroups = CacheBuilder.newBuilder()
137 + .expireAfterWrite(20, TimeUnit.SECONDS)
138 + .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
139 + if (notification.getCause() == RemovalCause.EXPIRED) {
140 + fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
141 + }
142 + }).build();
143 +
144 + groupService.addListener(new InnerGroupListener());
145 +
146 + }
147 +
148 + @Override
149 + public void filter(FilteringObjective filter) {
150 + Instructions.OutputInstruction output;
151 +
152 + if (filter.meta() != null && !filter.meta().immediate().isEmpty()) {
153 + output = (Instructions.OutputInstruction) filter.meta().immediate().stream()
154 + .filter(t -> t.type().equals(Instruction.Type.OUTPUT))
155 + .limit(1)
156 + .findFirst().get();
157 +
158 + if (output == null || !output.port().equals(PortNumber.CONTROLLER)) {
159 + log.error("OLT can only filter packet to controller");
160 + fail(filter, ObjectiveError.UNSUPPORTED);
161 + return;
162 + }
163 + } else {
164 + fail(filter, ObjectiveError.BADPARAMS);
165 + return;
166 + }
167 +
168 + if (filter.key().type() != Criterion.Type.IN_PORT) {
169 + fail(filter, ObjectiveError.BADPARAMS);
170 + return;
171 + }
172 +
173 + EthTypeCriterion ethType = (EthTypeCriterion)
174 + filterForCriterion(filter.conditions(), Criterion.Type.ETH_TYPE);
175 +
176 + if (ethType == null) {
177 + fail(filter, ObjectiveError.BADPARAMS);
178 + return;
179 + }
180 +
181 + if (ethType.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
182 + provisionEapol(filter, ethType, output);
183 + } else if (ethType.ethType().equals(EthType.EtherType.IPV4.ethType())) {
184 + IPProtocolCriterion ipProto = (IPProtocolCriterion)
185 + filterForCriterion(filter.conditions(), Criterion.Type.IP_PROTO);
186 + if (ipProto.protocol() == IPv4.PROTOCOL_IGMP) {
187 + provisionIgmp(filter, ethType, ipProto, output);
188 + } else {
189 + log.error("OLT can only filter igmp");
190 + fail(filter, ObjectiveError.UNSUPPORTED);
191 + }
192 + } else {
193 + log.error("OLT can only filter eapol and igmp");
194 + fail(filter, ObjectiveError.UNSUPPORTED);
195 + }
196 +
197 + }
198 +
199 + private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
200 + FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
201 + switch (objective.op()) {
202 +
203 + case ADD:
204 + flowBuilder.add(ruleBuilder.build());
205 + break;
206 + case REMOVE:
207 + flowBuilder.remove(ruleBuilder.build());
208 + break;
209 + default:
210 + log.warn("Unknown operation {}", objective.op());
211 + }
212 +
213 + flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
214 + @Override
215 + public void onSuccess(FlowRuleOperations ops) {
216 + objective.context().ifPresent(context -> context.onSuccess(objective));
217 + }
218 +
219 + @Override
220 + public void onError(FlowRuleOperations ops) {
221 + objective.context()
222 + .ifPresent(context -> context.onError(objective, ObjectiveError.FLOWINSTALLATIONFAILED));
223 + }
224 + }));
225 + }
226 +
227 + @Override
228 + public void forward(ForwardingObjective fwd) {
229 +
230 + if (checkForMulticast(fwd)) {
231 + processMulticastRule(fwd);
232 + return;
233 + }
234 +
235 + TrafficTreatment treatment = fwd.treatment();
236 +
237 + List<Instruction> instructions = treatment.allInstructions();
238 +
239 + Optional<Instruction> vlanIntruction = instructions.stream()
240 + .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
241 + .filter(i -> ((L2ModificationInstruction) i).subtype() ==
242 + L2ModificationInstruction.L2SubType.VLAN_PUSH ||
243 + ((L2ModificationInstruction) i).subtype() ==
244 + L2ModificationInstruction.L2SubType.VLAN_POP)
245 + .findAny();
246 +
247 + if (vlanIntruction.isPresent()) {
248 + L2ModificationInstruction vlanIns =
249 + (L2ModificationInstruction) vlanIntruction.get();
250 +
251 + if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_PUSH) {
252 + installUpstreamRules(fwd);
253 + } else if (vlanIns.subtype() == L2ModificationInstruction.L2SubType.VLAN_POP) {
254 + installDownstreamRules(fwd);
255 + } else {
256 + log.error("Unknown OLT operation: {}", fwd);
257 + fail(fwd, ObjectiveError.UNSUPPORTED);
258 + return;
259 + }
260 +
261 + pass(fwd);
262 + } else {
263 + TrafficSelector selector = fwd.selector();
264 +
265 + if (fwd.treatment() != null) {
266 + // Deal with SPECIFIC and VERSATILE in the same manner.
267 + FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
268 + .forDevice(deviceId)
269 + .withSelector(selector)
270 + .fromApp(fwd.appId())
271 + .withPriority(fwd.priority())
272 + .withTreatment(fwd.treatment());
273 +
274 + if (fwd.permanent()) {
275 + ruleBuilder.makePermanent();
276 + } else {
277 + ruleBuilder.makeTemporary(fwd.timeout());
278 + }
279 + installObjective(ruleBuilder, fwd);
280 +
281 + } else {
282 + log.error("No treatment error: {}", fwd);
283 + fail(fwd, ObjectiveError.UNSUPPORTED);
284 + }
285 + }
286 +
287 + }
288 +
289 +
290 + @Override
291 + public void next(NextObjective nextObjective) {
292 + if (nextObjective.type() != NextObjective.Type.BROADCAST) {
293 + log.error("OLT only supports broadcast groups.");
294 + fail(nextObjective, ObjectiveError.BADPARAMS);
295 + }
296 +
297 + if (nextObjective.next().size() != 1) {
298 + log.error("OLT only supports singleton broadcast groups.");
299 + fail(nextObjective, ObjectiveError.BADPARAMS);
300 + }
301 +
302 + TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
303 +
304 +
305 + GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
306 + GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
307 +
308 +
309 + pendingGroups.put(key, nextObjective);
310 +
311 + switch (nextObjective.op()) {
312 + case ADD:
313 + GroupDescription groupDesc =
314 + new DefaultGroupDescription(deviceId,
315 + GroupDescription.Type.ALL,
316 + new GroupBuckets(Collections.singletonList(bucket)),
317 + key,
318 + null,
319 + nextObjective.appId());
320 + groupService.addGroup(groupDesc);
321 + break;
322 + case REMOVE:
323 + groupService.removeGroup(deviceId, key, nextObjective.appId());
324 + break;
325 + case ADD_TO_EXISTING:
326 + groupService.addBucketsToGroup(deviceId, key,
327 + new GroupBuckets(Collections.singletonList(bucket)),
328 + key, nextObjective.appId());
329 + break;
330 + case REMOVE_FROM_EXISTING:
331 + groupService.removeBucketsFromGroup(deviceId, key,
332 + new GroupBuckets(Collections.singletonList(bucket)),
333 + key, nextObjective.appId());
334 + break;
335 + default:
336 + log.warn("Unknown next objective operation: {}", nextObjective.op());
337 + }
338 +
339 +
340 + }
341 +
342 + private void processMulticastRule(ForwardingObjective fwd) {
343 + if (fwd.nextId() == null) {
344 + log.error("Multicast objective does not have a next id");
345 + fail(fwd, ObjectiveError.BADPARAMS);
346 + }
347 +
348 + GroupKey key = getGroupForNextObjective(fwd.nextId());
349 +
350 + if (key == null) {
351 + log.error("Group for forwarding objective missing: {}", fwd);
352 + fail(fwd, ObjectiveError.GROUPMISSING);
353 + }
354 +
355 + Group group = groupService.getGroup(deviceId, key);
356 + TrafficTreatment treatment =
357 + buildTreatment(Instructions.createGroup(group.id()));
358 +
359 + FlowRule rule = DefaultFlowRule.builder()
360 + .fromApp(fwd.appId())
361 + .forDevice(deviceId)
362 + .forTable(0)
363 + .makePermanent()
364 + .withPriority(fwd.priority())
365 + .withSelector(fwd.selector())
366 + .withTreatment(treatment)
367 + .build();
368 +
369 + FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
370 + switch (fwd.op()) {
371 +
372 + case ADD:
373 + builder.add(rule);
374 + break;
375 + case REMOVE:
376 + builder.remove(rule);
377 + break;
378 + case ADD_TO_EXISTING:
379 + case REMOVE_FROM_EXISTING:
380 + break;
381 + default:
382 + log.warn("Unknown forwarding operation: {}", fwd.op());
383 + }
384 +
385 + applyFlowRules(builder, fwd);
386 +
387 + }
388 +
389 + private boolean checkForMulticast(ForwardingObjective fwd) {
390 +
391 + IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
392 + Criterion.Type.IPV4_DST);
393 +
394 + if (ip == null) {
395 + return false;
396 + }
397 +
398 + return ip.ip().isMulticast();
399 +
400 + }
401 +
402 + private GroupKey getGroupForNextObjective(Integer nextId) {
403 + NextGroup next = flowObjectiveStore.getNextGroup(nextId);
404 + return appKryo.deserialize(next.data());
405 +
406 + }
407 +
408 + private void installDownstreamRules(ForwardingObjective fwd) {
409 + List<Pair<Instruction, Instruction>> vlanOps =
410 + vlanOps(fwd,
411 + L2ModificationInstruction.L2SubType.VLAN_POP);
412 +
413 + if (vlanOps == null) {
414 + return;
415 + }
416 +
417 + Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, "downstream");
418 +
419 + if (output == null) {
420 + return;
421 + }
422 +
423 + Pair<Instruction, Instruction> popAndRewrite = vlanOps.remove(0);
424 +
425 + TrafficSelector selector = fwd.selector();
426 +
427 + Criterion outerVlan = selector.getCriterion(Criterion.Type.VLAN_VID);
428 + Criterion innerVlan = selector.getCriterion(Criterion.Type.INNER_VLAN_VID);
429 + Criterion inport = selector.getCriterion(Criterion.Type.IN_PORT);
430 + Criterion bullshit = Criteria.matchMetadata(output.port().toLong());
431 +
432 + if (outerVlan == null || innerVlan == null || inport == null) {
433 + log.error("Forwarding objective is underspecified: {}", fwd);
434 + fail(fwd, ObjectiveError.BADPARAMS);
435 + return;
436 + }
437 +
438 + Criterion innerVid = Criteria.matchVlanId(((VlanIdCriterion) innerVlan).vlanId());
439 +
440 + FlowRule.Builder outer = DefaultFlowRule.builder()
441 + .fromApp(fwd.appId())
442 + .forDevice(deviceId)
443 + .makePermanent()
444 + .withPriority(fwd.priority())
445 + .withSelector(buildSelector(inport, outerVlan, bullshit))
446 + .withTreatment(buildTreatment(popAndRewrite.getLeft(),
447 + Instructions.transition(QQ_TABLE)));
448 +
449 + FlowRule.Builder inner = DefaultFlowRule.builder()
450 + .fromApp(fwd.appId())
451 + .forDevice(deviceId)
452 + .forTable(QQ_TABLE)
453 + .makePermanent()
454 + .withPriority(fwd.priority())
455 + .withSelector(buildSelector(inport, innerVid))
456 + .withTreatment(buildTreatment(popAndRewrite.getLeft(),
457 + output));
458 +
459 + applyRules(fwd, inner, outer);
460 +
461 + }
462 +
463 + private boolean hasUntaggedVlanTag(TrafficSelector selector) {
464 + Iterator<Criterion> iter = selector.criteria().iterator();
465 +
466 + while (iter.hasNext()) {
467 + Criterion criterion = iter.next();
468 + if (criterion.type() == Criterion.Type.VLAN_VID &&
469 + ((VlanIdCriterion) criterion).vlanId().toShort() == VlanId.UNTAGGED) {
470 + return true;
471 + }
472 + }
473 +
474 + return false;
475 + }
476 +
477 + private void installUpstreamRules(ForwardingObjective fwd) {
478 + List<Pair<Instruction, Instruction>> vlanOps =
479 + vlanOps(fwd,
480 + L2ModificationInstruction.L2SubType.VLAN_PUSH);
481 + FlowRule.Builder inner;
482 +
483 + if (vlanOps == null) {
484 + return;
485 + }
486 +
487 + Instruction output = fetchOutput(fwd, "upstream");
488 +
489 + if (output == null) {
490 + return;
491 + }
492 +
493 + Pair<Instruction, Instruction> innerPair = vlanOps.remove(0);
494 +
495 + Pair<Instruction, Instruction> outerPair = vlanOps.remove(0);
496 +
497 +
498 + if (hasUntaggedVlanTag(fwd.selector())) {
499 + inner = DefaultFlowRule.builder()
500 + .fromApp(fwd.appId())
501 + .forDevice(deviceId)
502 + .makePermanent()
503 + .withPriority(fwd.priority())
504 + .withSelector(fwd.selector())
505 + .withTreatment(buildTreatment(innerPair.getLeft(),
506 + innerPair.getRight(),
507 + Instructions.transition(QQ_TABLE)));
508 + } else {
509 + inner = DefaultFlowRule.builder()
510 + .fromApp(fwd.appId())
511 + .forDevice(deviceId)
512 + .makePermanent()
513 + .withPriority(fwd.priority())
514 + .withSelector(fwd.selector())
515 + .withTreatment(buildTreatment(
516 + innerPair.getRight(),
517 + Instructions.transition(QQ_TABLE)));
518 + }
519 +
520 +
521 + PortCriterion inPort = (PortCriterion)
522 + fwd.selector().getCriterion(Criterion.Type.IN_PORT);
523 +
524 + VlanId cVlanId = ((L2ModificationInstruction.ModVlanIdInstruction)
525 + innerPair.getRight()).vlanId();
526 +
527 + FlowRule.Builder outer = DefaultFlowRule.builder()
528 + .fromApp(fwd.appId())
529 + .forDevice(deviceId)
530 + .forTable(QQ_TABLE)
531 + .makePermanent()
532 + .withPriority(fwd.priority())
533 + .withSelector(buildSelector(inPort,
534 + Criteria.matchVlanId(cVlanId)))
535 + .withTreatment(buildTreatment(outerPair.getLeft(),
536 + outerPair.getRight(),
537 + output));
538 +
539 + applyRules(fwd, inner, outer);
540 +
541 + }
542 +
543 + private Instruction fetchOutput(ForwardingObjective fwd, String direction) {
544 + Instruction output = fwd.treatment().allInstructions().stream()
545 + .filter(i -> i.type() == Instruction.Type.OUTPUT)
546 + .findFirst().orElse(null);
547 +
548 + if (output == null) {
549 + log.error("OLT {} rule has no output", direction);
550 + fail(fwd, ObjectiveError.BADPARAMS);
551 + return null;
552 + }
553 + return output;
554 + }
555 +
556 + private List<Pair<Instruction, Instruction>> vlanOps(ForwardingObjective fwd,
557 + L2ModificationInstruction.L2SubType type) {
558 +
559 + List<Pair<Instruction, Instruction>> vlanOps = findVlanOps(
560 + fwd.treatment().allInstructions(), type);
561 +
562 + if (vlanOps == null) {
563 + String direction = type == L2ModificationInstruction.L2SubType.VLAN_POP
564 + ? "downstream" : "upstream";
565 + log.error("Missing vlan operations in {} forwarding: {}", direction, fwd);
566 + fail(fwd, ObjectiveError.BADPARAMS);
567 + return null;
568 + }
569 + return vlanOps;
570 + }
571 +
572 +
573 + private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
574 + L2ModificationInstruction.L2SubType type) {
575 +
576 + List<Instruction> vlanPushs = findL2Instructions(
577 + type,
578 + instructions);
579 + List<Instruction> vlanSets = findL2Instructions(
580 + L2ModificationInstruction.L2SubType.VLAN_ID,
581 + instructions);
582 +
583 + if (vlanPushs.size() != vlanSets.size()) {
584 + return null;
585 + }
586 +
587 + List<Pair<Instruction, Instruction>> pairs = Lists.newArrayList();
588 +
589 + for (int i = 0; i < vlanPushs.size(); i++) {
590 + pairs.add(new ImmutablePair<>(vlanPushs.get(i), vlanSets.get(i)));
591 + }
592 + return pairs;
593 + }
594 +
595 + private List<Instruction> findL2Instructions(L2ModificationInstruction.L2SubType subType,
596 + List<Instruction> actions) {
597 + return actions.stream()
598 + .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
599 + .filter(i -> ((L2ModificationInstruction) i).subtype() == subType)
600 + .collect(Collectors.toList());
601 + }
602 +
603 + private void provisionEapol(FilteringObjective filter,
604 + EthTypeCriterion ethType,
605 + Instructions.OutputInstruction output) {
606 +
607 + TrafficSelector selector = buildSelector(filter.key(), ethType);
608 + TrafficTreatment treatment = buildTreatment(output);
609 + buildAndApplyRule(filter, selector, treatment);
610 +
611 + }
612 +
613 + private void provisionIgmp(FilteringObjective filter, EthTypeCriterion ethType,
614 + IPProtocolCriterion ipProto,
615 + Instructions.OutputInstruction output) {
616 + TrafficSelector selector = buildSelector(filter.key(), ethType, ipProto);
617 + TrafficTreatment treatment = buildTreatment(output);
618 + buildAndApplyRule(filter, selector, treatment);
619 + }
620 +
621 + private void buildAndApplyRule(FilteringObjective filter, TrafficSelector selector,
622 + TrafficTreatment treatment) {
623 + FlowRule rule = DefaultFlowRule.builder()
624 + .fromApp(filter.appId())
625 + .forDevice(deviceId)
626 + .forTable(0)
627 + .makePermanent()
628 + .withSelector(selector)
629 + .withTreatment(treatment)
630 + .withPriority(filter.priority())
631 + .build();
632 +
633 + FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
634 +
635 + switch (filter.type()) {
636 + case PERMIT:
637 + opsBuilder.add(rule);
638 + break;
639 + case DENY:
640 + opsBuilder.remove(rule);
641 + break;
642 + default:
643 + log.warn("Unknown filter type : {}", filter.type());
644 + fail(filter, ObjectiveError.UNSUPPORTED);
645 + }
646 +
647 + applyFlowRules(opsBuilder, filter);
648 + }
649 +
650 + private void applyRules(ForwardingObjective fwd,
651 + FlowRule.Builder inner, FlowRule.Builder outer) {
652 + FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
653 + switch (fwd.op()) {
654 + case ADD:
655 + builder.add(inner.build()).add(outer.build());
656 + break;
657 + case REMOVE:
658 + builder.remove(inner.build()).remove(outer.build());
659 + break;
660 + case ADD_TO_EXISTING:
661 + break;
662 + case REMOVE_FROM_EXISTING:
663 + break;
664 + default:
665 + log.warn("Unknown forwarding operation: {}", fwd.op());
666 + }
667 +
668 + applyFlowRules(builder, fwd);
669 + }
670 +
671 + private void applyFlowRules(FlowRuleOperations.Builder builder,
672 + Objective objective) {
673 + flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
674 + @Override
675 + public void onSuccess(FlowRuleOperations ops) {
676 + pass(objective);
677 + }
678 +
679 + @Override
680 + public void onError(FlowRuleOperations ops) {
681 + fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
682 + }
683 + }));
684 + }
685 +
686 + private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
687 + return criteria.stream()
688 + .filter(c -> c.type().equals(type))
689 + .limit(1)
690 + .findFirst().orElse(null);
691 + }
692 +
693 + private TrafficSelector buildSelector(Criterion... criteria) {
694 +
695 +
696 + TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
697 +
698 + for (Criterion c : criteria) {
699 + sBuilder.add(c);
700 + }
701 +
702 + return sBuilder.build();
703 + }
704 +
705 + private TrafficTreatment buildTreatment(Instruction... instructions) {
706 +
707 +
708 + TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
709 +
710 + for (Instruction i : instructions) {
711 + tBuilder.add(i);
712 + }
713 +
714 + return tBuilder.build();
715 + }
716 +
717 +
718 + private void fail(Objective obj, ObjectiveError error) {
719 + obj.context().ifPresent(context -> context.onError(obj, error));
720 + }
721 +
722 + private void pass(Objective obj) {
723 + obj.context().ifPresent(context -> context.onSuccess(obj));
724 + }
725 +
726 +
727 + private class InnerGroupListener implements GroupListener {
728 + @Override
729 + public void event(GroupEvent event) {
730 + if (event.type() == GroupEvent.Type.GROUP_ADDED || event.type() == GroupEvent.Type.GROUP_UPDATED) {
731 + GroupKey key = event.subject().appCookie();
732 +
733 + NextObjective obj = pendingGroups.getIfPresent(key);
734 + if (obj != null) {
735 + flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
736 + pass(obj);
737 + pendingGroups.invalidate(key);
738 + }
739 + }
740 + }
741 + }
742 +
743 + private static class OLTPipelineGroup implements NextGroup {
744 +
745 + private final GroupKey key;
746 +
747 + public OLTPipelineGroup(GroupKey key) {
748 + this.key = key;
749 + }
750 +
751 + public GroupKey key() {
752 + return key;
753 + }
754 +
755 + @Override
756 + public byte[] data() {
757 + return appKryo.serialize(key);
758 + }
759 +
760 + }
761 +
762 + @Override
763 + public List<String> getNextMappings(NextGroup nextGroup) {
764 + // TODO Implementation deferred to vendor
765 + return null;
766 + }
767 +}
...@@ -101,7 +101,7 @@ ...@@ -101,7 +101,7 @@
101 <driver name="nokia-olt" extends="default" 101 <driver name="nokia-olt" extends="default"
102 manufacturer="Nokia" hwVersion="SDOLT" swVersion="5.2.1"> 102 manufacturer="Nokia" hwVersion="SDOLT" swVersion="5.2.1">
103 <behaviour api="org.onosproject.net.behaviour.Pipeliner" 103 <behaviour api="org.onosproject.net.behaviour.Pipeliner"
104 - impl="org.onosproject.driver.pipeline.OltPipeline"/> 104 + impl="org.onosproject.driver.pipeline.NokiaOltPipeline"/>
105 </driver> 105 </driver>
106 <driver name="g.fast" extends="default" 106 <driver name="g.fast" extends="default"
107 manufacturer="TEST1" hwVersion="TEST2" swVersion="TEST3"> 107 manufacturer="TEST1" hwVersion="TEST2" swVersion="TEST3">
......