Hongtao Yin
Committed by Brian O'Connor

Initial implementation: The init extended flow rule and store interface APIs

The APIs are for supporting service data to install on network devices.
This is related to JIRA ticket ID ONOS-869.

Updated API code and added implementation code files.

Modified API for supporting payload abstruction, and added routing mechanism for pushing flow rules to devices.
Added more javadoc, and fixed some minor issues.

Updated javadoc, removed unnecessary method, and test code.

Change-Id: I105defc92a9e01b30601fcb56a9dafa086d4adc0
...@@ -172,7 +172,6 @@ public class DefaultFlowRule implements FlowRule { ...@@ -172,7 +172,6 @@ public class DefaultFlowRule implements FlowRule {
172 return treatment; 172 return treatment;
173 } 173 }
174 174
175 -
176 @Override 175 @Override
177 /* 176 /*
178 * The priority and statistics can change on a given treatment and selector 177 * The priority and statistics can change on a given treatment and selector
......
...@@ -25,7 +25,7 @@ import java.util.Set; ...@@ -25,7 +25,7 @@ import java.util.Set;
25 public class FlowRuleBatchRequest { 25 public class FlowRuleBatchRequest {
26 26
27 /** 27 /**
28 - * This id is used to cary to id of the original 28 + * This id is used to carry to id of the original
29 * FlowOperations and track where this batch operation 29 * FlowOperations and track where this batch operation
30 * came from. The id is unique cluster wide. 30 * came from. The id is unique cluster wide.
31 */ 31 */
...@@ -37,8 +37,6 @@ public class FlowRuleBatchRequest { ...@@ -37,8 +37,6 @@ public class FlowRuleBatchRequest {
37 public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) { 37 public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
38 this.batchId = batchId; 38 this.batchId = batchId;
39 this.ops = Collections.unmodifiableSet(ops); 39 this.ops = Collections.unmodifiableSet(ops);
40 -
41 -
42 } 40 }
43 41
44 public Set<FlowRuleBatchEntry> ops() { 42 public Set<FlowRuleBatchEntry> ops() {
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import org.onosproject.core.ApplicationId;
19 +import org.onosproject.core.DefaultGroupId;
20 +import org.onosproject.core.GroupId;
21 +import org.onosproject.net.DeviceId;
22 +import org.onosproject.net.flow.DefaultFlowRule;
23 +import org.onosproject.net.flow.FlowRule;
24 +import org.onosproject.net.flow.TrafficSelector;
25 +import org.onosproject.net.flow.TrafficTreatment;
26 +
27 +import java.util.Objects;
28 +
29 +import static com.google.common.base.MoreObjects.toStringHelper;
30 +
31 +/**
32 + * Experimental extension to the flow rule subsystem; still under development.
33 + * A temporary flow rule extend implementation, It will cover current onos flow rule and other flow extension.
34 + */
35 +public class DefaultFlowRuleExt
36 + extends DefaultFlowRule implements FlowRuleExt {
37 +
38 + private FlowEntryExtension flowEntryExtension;
39 +
40 + public DefaultFlowRuleExt(DeviceId deviceId, TrafficSelector selector,
41 + TrafficTreatment treatment, int priority, long flowId,
42 + int timeout, boolean permanent) {
43 + super(deviceId, selector, treatment, priority, flowId, timeout, permanent);
44 + }
45 +
46 + public DefaultFlowRuleExt(DeviceId deviceId, TrafficSelector selector,
47 + TrafficTreatment treatment, int priority, ApplicationId appId,
48 + int timeout, boolean permanent) {
49 + this(deviceId, selector, treatment, priority, appId, new DefaultGroupId(0),
50 + timeout, permanent);
51 + }
52 +
53 + public DefaultFlowRuleExt(DeviceId deviceId, TrafficSelector selector,
54 + TrafficTreatment treatment, int priority, ApplicationId appId,
55 + GroupId groupId, int timeout, boolean permanent) {
56 + super(deviceId, selector, treatment, priority, appId, groupId, timeout, permanent);
57 + }
58 +
59 + public DefaultFlowRuleExt(FlowRule rule) {
60 + super(rule);
61 + }
62 +
63 + public DefaultFlowRuleExt(ApplicationId appId, DeviceId deviceId, FlowEntryExtension data) {
64 + this(deviceId, null, null, FlowRule.MIN_PRIORITY, appId, 0, false);
65 + this.flowEntryExtension = data;
66 + }
67 +
68 + @Override
69 + public FlowEntryExtension getFlowEntryExt() {
70 + return this.flowEntryExtension;
71 + }
72 +
73 + @Override
74 + public int hashCode() {
75 + return 31 * super.hashCode() + Objects.hash(flowEntryExtension);
76 + }
77 +
78 + public int hash() {
79 + return 31 * super.hashCode() + Objects.hash(flowEntryExtension);
80 + }
81 +
82 + @Override
83 + public boolean equals(Object obj) {
84 + if (this == obj) {
85 + return true;
86 + }
87 + if (obj == null || getClass() != obj.getClass()) {
88 + return false;
89 + }
90 + if (!super.equals(obj)) {
91 + return false;
92 + }
93 + final DefaultFlowRuleExt other = (DefaultFlowRuleExt) obj;
94 + return Objects.equals(this.flowEntryExtension, other.flowEntryExtension);
95 + }
96 +
97 + @Override
98 + public String toString() {
99 + return toStringHelper(this)
100 + // TODO there might be a better way to grab super's string
101 + .add("id", Long.toHexString(id().value()))
102 + .add("deviceId", deviceId())
103 + .add("priority", priority())
104 + .add("selector", selector().criteria())
105 + .add("treatment", treatment() == null ? "N/A" : treatment().instructions())
106 + //.add("created", created)
107 + .add("flowEntryExtension", flowEntryExtension)
108 + .toString();
109 + }
110 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import java.nio.ByteBuffer;
19 +import java.util.Objects;
20 +
21 +/**
22 + * Experimental extension to the flow rule subsystem; still under development.
23 + * Represents a generic abstraction of the service data. User app can customize whatever it needs to install on devices.
24 + */
25 +public class DownStreamFlowEntry implements FlowEntryExtension {
26 +
27 + /**
28 + * temporarily only have byte stream, but it will be extract more abstract information from it later.
29 + */
30 + private final ByteBuffer payload;
31 +
32 + public DownStreamFlowEntry(ByteBuffer data) {
33 + this.payload = data;
34 + }
35 +
36 + /**
37 + * Get the payload of flowExtension.
38 + *
39 + * @return the byte steam value of payload.
40 + */
41 +// @Override
42 +// public ByteBuffer getPayload() {
43 + // TODO Auto-generated method stub
44 +// return payload;
45 +// }
46 +
47 + /**
48 + * Returns a hash code value for the object.
49 + * It use payload as parameter to hash.
50 + *
51 + * @return a hash code value for this object.
52 + */
53 + @Override
54 + public int hashCode() {
55 + return Objects.hash(payload);
56 + }
57 +
58 + /**
59 + * Indicates whether some other object is "equal to" this one.
60 + *
61 + * @param obj the reference object with which to compare.
62 + * @return {@code true} if this object is the same as the obj
63 + * argument; {@code false} otherwise.
64 + */
65 + @Override
66 + public boolean equals(Object obj) {
67 + if (obj instanceof DownStreamFlowEntry) {
68 + DownStreamFlowEntry packet = (DownStreamFlowEntry) obj;
69 + return Objects.equals(this.payload, packet.payload);
70 + } else {
71 + return false;
72 + }
73 + }
74 +
75 + /**
76 + * Returns a string representation of the object.
77 + *
78 + * @return a string representation of the object.
79 + */
80 + @Override
81 + public String toString() {
82 + String obj = new String(payload.array());
83 + return obj;
84 + }
85 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +
19 +/**
20 + * Experimental extension to the flow rule subsystem; still under development.
21 + * Represents a generic abstraction of the service data. User app can customize whatever it needs to install on devices.
22 + */
23 +public interface FlowEntryExtension {
24 + // some abstraction of the service data, like length, type, etc, will be added here later
25 +
26 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import com.google.common.base.MoreObjects;
19 +import org.onosproject.net.flow.CompletedBatchOperation;
20 +import org.onosproject.net.flow.FlowRule;
21 +
22 +import java.util.Set;
23 +
24 +/**
25 + * Experimental extension to the flow rule subsystem; still under development.
26 + * <p>
27 + * Representation of a completed flow rule batch operation.
28 + * </p>
29 + */
30 +//TODO explain the purpose of this class beyond FlowRuleProvider
31 +public class FlowExtCompletedOperation extends CompletedBatchOperation {
32 + // the batchId is provided by application, once one flow rule of this batch failed
33 + // all the batch should withdraw
34 + private final long batchId;
35 +
36 + public FlowExtCompletedOperation(long batchId, boolean success, Set<FlowRule> failures) {
37 + super(success, failures, null);
38 + this.batchId = batchId;
39 + }
40 +
41 + /**
42 + * Returns the BatchId of this BatchOperation.
43 + *
44 + * @return the number of Batch
45 + */
46 + public long getBatchId() {
47 + return batchId;
48 + }
49 +
50 + /**
51 + * Returns a string representation of the object.
52 + *
53 + * @return a string representation of the object.
54 + */
55 + @Override
56 + public String toString() {
57 + return MoreObjects.toStringHelper(getClass())
58 + .add("success?", isSuccess())
59 + .add("failedItems", failedIds())
60 + .toString();
61 + }
62 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import org.onosproject.net.flow.FlowRule;
19 +
20 +/**
21 + * Experimental extension to the flow rule subsystem; still under development.
22 + * <p>
23 + * FlowRule extended for current FlowRule API.
24 + * </p>
25 + */
26 +public interface FlowRuleExt extends FlowRule {
27 + /**
28 + * Get the flow entry extension.
29 + *
30 + * @return FlowEntryExtension value.
31 + */
32 + FlowEntryExtension getFlowEntryExt();
33 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import org.onosproject.net.flow.FlowRuleBatchEvent;
19 +import org.onosproject.net.flow.FlowRuleBatchRequest;
20 +
21 +import java.util.concurrent.Future;
22 +
23 +/**
24 + * Experimental extension to the flow rule subsystem; still under development.
25 + * Represents a router-like mechanism which is in charge of sending flow rule to master;
26 + * <p>
27 + * The Router is in charge of sending flow rule to master;
28 + * the core component of routing-like mechanism.
29 + * </p>
30 + */
31 +public interface FlowRuleExtRouter {
32 +
33 + /**
34 + * apply the sub batch of flow extension rules.
35 + *
36 + * @param batchOperation batch of flow rules.
37 + * A batch can contain flow rules for a single device only.
38 + * @return Future response indicating success/failure of the batch operation
39 + * all the way down to the device.
40 + */
41 + Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation);
42 +
43 + /**
44 + * Invoked on the completion of a storeBatch operation.
45 + *
46 + * @param event flow rule batch event
47 + */
48 + void batchOperationComplete(FlowRuleBatchEvent event);
49 +
50 + /**
51 + * Register the listener to monitor Router,
52 + * The Router find master to send downStream.
53 + *
54 + * @param listener the listener to register
55 + */
56 + public void addListener(FlowRuleExtRouterListener listener);
57 +
58 + /**
59 + * Remove the listener of Router.
60 + *
61 + * @param listener the listener to remove
62 + */
63 + public void removeListener(FlowRuleExtRouterListener listener);
64 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import org.onosproject.net.flow.FlowRuleBatchEvent;
19 +
20 +/**
21 + * Experimental extension to the flow rule subsystem; still under development.
22 + * The monitor module of the router.
23 + * <p>
24 + * The monitor module of router.
25 + * </p>
26 + */
27 +public interface FlowRuleExtRouterListener {
28 +
29 + /**
30 + * Notify monitor the router has down its work.
31 + *
32 + * @param event the event to notify
33 + */
34 + void notify(FlowRuleBatchEvent event);
35 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext;
17 +
18 +import org.onosproject.net.flow.FlowRuleBatchRequest;
19 +import org.onosproject.net.flow.FlowRuleService;
20 +
21 +import java.util.concurrent.Future;
22 +
23 +/**
24 + * Experimental extension to the flow rule subsystem; still under development.
25 + * Service for injecting extended flow rules into the environment.
26 + * This service just send the packet downstream. It won't store the
27 + * flowRuleExtension in cache.
28 + */
29 +public interface FlowRuleExtService extends FlowRuleService {
30 + /**
31 + * Applies a batch operation of FlowRules.
32 + * this batch can be divided into many sub-batch by deviceId, and application
33 + * gives a batchId, it means once one flowRule apply failed, all flow rules should
34 + * withdraw.
35 + *
36 + * @param batch batch operation to apply
37 + * @return future indicating the state of the batch operation
38 + */
39 + Future<FlowExtCompletedOperation> applyBatch(FlowRuleBatchRequest batch);
40 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +/**
18 + * Experimental extension to the flow rule subsystem; still under development.
19 + * <p>
20 + * This package is an extension for the current ONOS flow rule API.
21 + * Its main purpose is to support external applications to push service data to network elements.
22 + * The service data could be any kind of service related data or commands required for corresponding service
23 + * setup and other operations as defined by application and its communicating device.
24 + * </p>
25 + */
26 +package org.onosproject.net.flowext;
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.net.flowext.impl;
17 +
18 +import com.google.common.collect.ArrayListMultimap;
19 +import com.google.common.collect.Lists;
20 +import com.google.common.collect.Multimap;
21 +import com.google.common.collect.Sets;
22 +import org.apache.felix.scr.annotations.Activate;
23 +import org.apache.felix.scr.annotations.Component;
24 +import org.apache.felix.scr.annotations.Deactivate;
25 +import org.apache.felix.scr.annotations.Reference;
26 +import org.apache.felix.scr.annotations.ReferenceCardinality;
27 +import org.apache.felix.scr.annotations.Service;
28 +import org.onosproject.event.AbstractListenerRegistry;
29 +import org.onosproject.event.EventDeliveryService;
30 +import org.onosproject.net.DeviceId;
31 +import org.onosproject.net.device.DeviceService;
32 +import org.onosproject.net.flow.FlowRule;
33 +import org.onosproject.net.flow.FlowRuleBatchEntry;
34 +import org.onosproject.net.flow.FlowRuleBatchEvent;
35 +import org.onosproject.net.flow.FlowRuleBatchRequest;
36 +import org.onosproject.net.flow.FlowRuleEvent;
37 +import org.onosproject.net.flow.FlowRuleListener;
38 +import org.onosproject.net.flow.FlowRuleProvider;
39 +import org.onosproject.net.flow.impl.FlowRuleManager;
40 +import org.onosproject.net.flowext.FlowExtCompletedOperation;
41 +import org.onosproject.net.flowext.FlowRuleExtRouter;
42 +import org.onosproject.net.flowext.FlowRuleExtRouterListener;
43 +import org.onosproject.net.flowext.FlowRuleExtService;
44 +import org.slf4j.Logger;
45 +
46 +import java.util.Collection;
47 +import java.util.Collections;
48 +import java.util.List;
49 +import java.util.Set;
50 +import java.util.concurrent.CancellationException;
51 +import java.util.concurrent.ExecutionException;
52 +import java.util.concurrent.ExecutorService;
53 +import java.util.concurrent.Executors;
54 +import java.util.concurrent.Future;
55 +import java.util.concurrent.TimeUnit;
56 +import java.util.concurrent.TimeoutException;
57 +import java.util.concurrent.atomic.AtomicReference;
58 +
59 +import static org.onlab.util.Tools.namedThreads;
60 +import static org.slf4j.LoggerFactory.getLogger;
61 +
62 +/**
63 + * Experimental extension to the flow rule subsystem; still under development.
64 + */
65 +@Component(immediate = true)
66 +@Service
67 +public class FlowRuleExtManager extends FlowRuleManager
68 + implements FlowRuleExtService {
69 +
70 + enum BatchState {
71 + STARTED, FINISHED, CANCELLED
72 + }
73 +
74 + public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
75 + private final Logger log = getLogger(getClass());
76 +
77 + private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
78 + listenerRegistry = new AbstractListenerRegistry<>();
79 +
80 + private ExecutorService futureService;
81 +
82 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 + protected FlowRuleExtRouter router;
84 +
85 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 + protected EventDeliveryService eventDispatcher;
87 +
88 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89 + protected DeviceService deviceService;
90 +
91 + InternalFlowRuleExtRouterListener routerListener = new InternalFlowRuleExtRouterListener();
92 +
93 + @Activate
94 + public void activate() {
95 + futureService = Executors.newFixedThreadPool(
96 + 32, namedThreads("provider-future-listeners-%d"));
97 + eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
98 + router.addListener(routerListener);
99 + log.info("Started");
100 + }
101 +
102 + @Deactivate
103 + public void deactivate() {
104 + futureService.shutdownNow();
105 + eventDispatcher.removeSink(FlowRuleEvent.class);
106 + router.removeListener(routerListener);
107 + log.info("Stopped");
108 + }
109 +
110 + /**
111 + * Applies a batch operation of FlowRules.
112 + * this batch can be divided into many sub-batch by deviceId
113 + *
114 + * @param batch batch operation to apply
115 + * @return future indicating the state of the batch operation
116 + */
117 + @Override
118 + public Future<FlowExtCompletedOperation> applyBatch(FlowRuleBatchRequest batch) {
119 + // TODO group the Collection into sub-Collection by deviceId
120 + Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap
121 + .create();
122 + for (FlowRuleBatchEntry fbe : batch.ops()) {
123 + FlowRule flowRule = fbe.target();
124 + perDeviceBatches.put(flowRule.deviceId(), fbe);
125 + }
126 +
127 + List<Future<FlowExtCompletedOperation>> futures = Lists.newArrayList();
128 + for (DeviceId deviceId : perDeviceBatches.keySet()) {
129 + Collection<FlowRuleBatchEntry> flows = perDeviceBatches.get(deviceId);
130 + //FIXME if there can be collisions, than converting the collection to a set will drop flow rules
131 + FlowRuleBatchRequest subBatch = new FlowRuleBatchRequest(batch.batchId(), Sets.newHashSet(flows));
132 + Future<FlowExtCompletedOperation> future = router.applySubBatch(subBatch);
133 + futures.add(future);
134 + }
135 + return new FlowRuleBatchFuture(batch.batchId(), futures);
136 + }
137 +
138 + /**
139 + * Batch futures include all flow extension entries in one batch.
140 + * Using for transaction and will use in next-step.
141 + */
142 + private class FlowRuleBatchFuture
143 + implements Future<FlowExtCompletedOperation> {
144 +
145 + private final List<Future<FlowExtCompletedOperation>> futures;
146 + private final long batchId;
147 + private final AtomicReference<BatchState> state;
148 + private FlowExtCompletedOperation overall;
149 +
150 + public FlowRuleBatchFuture(long batchId, List<Future<FlowExtCompletedOperation>> futures) {
151 + this.futures = futures;
152 + this.batchId = batchId;
153 + state = new AtomicReference<FlowRuleExtManager.BatchState>();
154 + state.set(BatchState.STARTED);
155 + }
156 +
157 + /**
158 + * Attempts to cancel execution of this task.
159 + *
160 + * @param mayInterruptIfRunning {@code true} if the thread executing this
161 + * task should be interrupted; otherwise, in-progress tasks are allowed
162 + * to complete
163 + * @return {@code false} if the task could not be cancelled,
164 + * typically because it has already completed normally;
165 + * {@code true} otherwise
166 + */
167 + @Override
168 + public boolean cancel(boolean mayInterruptIfRunning) {
169 + if (state.get() == BatchState.FINISHED) {
170 + return false;
171 + }
172 + if (log.isDebugEnabled()) {
173 + log.debug("Cancelling FlowRuleBatchFuture",
174 + new RuntimeException("Just printing backtrace"));
175 + }
176 + if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
177 + return false;
178 + }
179 + cleanUpBatch();
180 + for (Future<FlowExtCompletedOperation> f : futures) {
181 + f.cancel(mayInterruptIfRunning);
182 + }
183 + return true;
184 + }
185 +
186 + /**
187 + * Judge whether the task cancelled completely.
188 + *
189 + * @return {@code true} if this task was cancelled before it completed
190 + */
191 + @Override
192 + public boolean isCancelled() {
193 + return state.get() == BatchState.CANCELLED;
194 + }
195 +
196 + /**
197 + * Judge whether the task finished completely.
198 + *
199 + * @return {@code true} if this task completed
200 + */
201 + @Override
202 + public boolean isDone() {
203 + return state.get() == BatchState.FINISHED;
204 + }
205 +
206 + /**
207 + * Get the result of apply flow extension rules.
208 + * If the task isn't finished, the thread block here.
209 + */
210 + @Override
211 + public FlowExtCompletedOperation get()
212 + throws InterruptedException, ExecutionException {
213 +
214 + if (isDone()) {
215 + return overall;
216 + }
217 + boolean success = true;
218 + Set<FlowRule> failed = Sets.newHashSet();
219 + FlowExtCompletedOperation completed;
220 + for (Future<FlowExtCompletedOperation> future : futures) {
221 + completed = future.get();
222 + success = validateBatchOperation(failed, completed);
223 + }
224 + return finalizeBatchOperation(success, failed);
225 + }
226 +
227 + /**
228 + * Waits if necessary for at most the given time for the computation
229 + * to complete, and then retrieves its result, if available. In here,
230 + * the maximum of time out is sum of given time for every computation.
231 + *
232 + * @param timeout the maximum time to wait
233 + * @param unit the time unit of the timeout argument
234 + * @return the computed result
235 + * @throws CancellationException if the computation was cancelled
236 + * @throws ExecutionException if the computation threw an
237 + * exception
238 + * @throws InterruptedException if the current thread was interrupted
239 + * while waiting
240 + * @throws TimeoutException if the wait timed out
241 + */
242 + @Override
243 + public FlowExtCompletedOperation get(long timeout, TimeUnit unit)
244 + throws InterruptedException, ExecutionException,
245 + TimeoutException {
246 +
247 + if (isDone()) {
248 + return overall;
249 + }
250 + boolean success = true;
251 + Set<FlowRule> failed = Sets.newHashSet();
252 + FlowExtCompletedOperation completed;
253 + for (Future<FlowExtCompletedOperation> future : futures) {
254 + completed = future.get(timeout, unit);
255 + success = validateBatchOperation(failed, completed);
256 + }
257 + return finalizeBatchOperation(success, failed);
258 + }
259 +
260 + /**
261 + * Confirm whether the batch operation success.
262 + *
263 + * @param failed using to populate failed entries
264 + * @param completed the result of apply flow extension entries
265 + * @return {@code true} if all entries applies successful
266 + */
267 + private boolean validateBatchOperation(Set<FlowRule> failed,
268 + FlowExtCompletedOperation completed) {
269 +
270 + if (isCancelled()) {
271 + throw new CancellationException();
272 + }
273 + if (!completed.isSuccess()) {
274 + log.warn("FlowRuleBatch failed: {}", completed);
275 + failed.addAll(completed.failedItems());
276 + cleanUpBatch();
277 + cancelAllSubBatches();
278 + return false;
279 + }
280 + return true;
281 + }
282 +
283 + /**
284 + * Once one subBatch failed, cancel the rest of them.
285 + */
286 + private void cancelAllSubBatches() {
287 + for (Future<FlowExtCompletedOperation> f : futures) {
288 + f.cancel(true);
289 + }
290 + }
291 +
292 + /**
293 + * Construct the result of batch operation.
294 + *
295 + * @param success the result of batch operation
296 + * @param failed the failed entries of batch operation
297 + * @return FlowExtCompletedOperation of batch operation
298 + */
299 + private FlowExtCompletedOperation finalizeBatchOperation(boolean success,
300 + Set<FlowRule> failed) {
301 + synchronized (this) {
302 + if (!state.compareAndSet(BatchState.STARTED,
303 + BatchState.FINISHED)) {
304 + if (state.get() == BatchState.FINISHED) {
305 + return overall;
306 + }
307 + throw new CancellationException();
308 + }
309 + overall = new FlowExtCompletedOperation(batchId, success, failed);
310 + return overall;
311 + }
312 + }
313 +
314 + private void cleanUpBatch() {
315 + }
316 + }
317 +
318 + /**
319 + * South Bound API to south plug-in.
320 + */
321 + private class InternalFlowRuleExtRouterListener
322 + implements FlowRuleExtRouterListener {
323 + @Override
324 + public void notify(FlowRuleBatchEvent event) {
325 + // Request has been forwarded to MASTER Node
326 + for (FlowRuleBatchEntry entry : event.subject().ops()) {
327 + switch (entry.operator()) {
328 + case ADD:
329 + eventDispatcher
330 + .post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED,
331 + entry.target()));
332 + break;
333 + // FALLTHROUGH
334 + case REMOVE:
335 + case MODIFY:
336 + default:
337 + // TODO not implemented
338 + break;
339 + }
340 + }
341 + // send it
342 + FlowRuleProvider flowRuleProvider = getProvider(event.subject().ops()
343 + .iterator().next().target().deviceId());
344 + // TODO we may want to specify a deviceId
345 + flowRuleProvider.executeBatch(event.subject().asBatchOperation(null));
346 + // do not have transaction, assume it install success
347 + // temporarily
348 + FlowExtCompletedOperation result = new FlowExtCompletedOperation(
349 + event.subject().batchId(), true, Collections.emptySet());
350 + futureService.submit(() -> {
351 + router.batchOperationComplete(FlowRuleBatchEvent
352 + .completed(event.subject(), result));
353 + });
354 + }
355 + }
356 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +/**
18 + * Experimental extension to the flow rule subsystem; still under development.
19 + * <p>
20 + * This package is an extension for the current ONOS flow rule subsystem.
21 + * Its main purpose is to support external applications to push service data to network elements.
22 + * The service data could be any kind of service related data or commands required for corresponding service
23 + * setup and other operations as defined by application and its communicating device.
24 + * </p>
25 + */
26 +package org.onosproject.net.flowext.impl;
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.flowext.impl;
17 +
18 +import com.google.common.cache.Cache;
19 +import com.google.common.cache.CacheBuilder;
20 +import com.google.common.util.concurrent.Futures;
21 +import com.google.common.util.concurrent.ListenableFuture;
22 +import com.google.common.util.concurrent.SettableFuture;
23 +import org.apache.felix.scr.annotations.Activate;
24 +import org.apache.felix.scr.annotations.Component;
25 +import org.apache.felix.scr.annotations.Deactivate;
26 +import org.apache.felix.scr.annotations.Reference;
27 +import org.apache.felix.scr.annotations.ReferenceCardinality;
28 +import org.apache.felix.scr.annotations.Service;
29 +import org.onlab.util.KryoNamespace;
30 +import org.onosproject.cluster.ClusterService;
31 +import org.onosproject.net.DeviceId;
32 +import org.onosproject.net.device.DeviceService;
33 +import org.onosproject.net.flow.CompletedBatchOperation;
34 +import org.onosproject.net.flow.FlowRuleBatchEntry;
35 +import org.onosproject.net.flow.FlowRuleBatchEvent;
36 +import org.onosproject.net.flow.FlowRuleBatchRequest;
37 +import org.onosproject.net.flowext.DefaultFlowRuleExt;
38 +import org.onosproject.net.flowext.DownStreamFlowEntry;
39 +import org.onosproject.net.flowext.FlowExtCompletedOperation;
40 +import org.onosproject.net.flowext.FlowRuleExtRouter;
41 +import org.onosproject.net.flowext.FlowRuleExtRouterListener;
42 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
43 +import org.onosproject.store.cluster.messaging.ClusterMessage;
44 +import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45 +import org.onosproject.store.flow.ReplicaInfo;
46 +import org.onosproject.store.flow.ReplicaInfoEventListener;
47 +import org.onosproject.store.flow.ReplicaInfoService;
48 +import org.onosproject.store.serializers.DecodeTo;
49 +import org.onosproject.store.serializers.KryoSerializer;
50 +import org.onosproject.store.serializers.StoreSerializer;
51 +import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
52 +import org.slf4j.Logger;
53 +
54 +import java.io.IOException;
55 +import java.util.Collection;
56 +import java.util.Collections;
57 +import java.util.HashSet;
58 +import java.util.Iterator;
59 +import java.util.Set;
60 +import java.util.concurrent.ExecutorService;
61 +import java.util.concurrent.Executors;
62 +import java.util.concurrent.Future;
63 +import java.util.concurrent.TimeUnit;
64 +
65 +import static org.onlab.util.Tools.namedThreads;
66 +import static org.onosproject.store.flowext.impl.FlowExtRouterMessageSubjects.APPLY_EXTEND_FLOWS;
67 +import static org.slf4j.LoggerFactory.getLogger;
68 +
69 +/**
70 + * Experimental extension to the flow rule subsystem; still under development.
71 + * Implement a simple routing-like mechanism to directly send service data to its master and push to device.
72 + * This Router does not save any flow rule extension data in cache, it focus on routing mechanism.
73 + */
74 +@Component(immediate = true)
75 +@Service
76 +public class DefaultFlowRuleExtRouter
77 + implements FlowRuleExtRouter {
78 +
79 + private final Logger log = getLogger(getClass());
80 +
81 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 + protected ReplicaInfoService replicaInfoManager;
83 +
84 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 + protected ClusterCommunicationService clusterCommunicator;
86 +
87 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 + protected ClusterService clusterService;
89 +
90 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 + protected DeviceService deviceService;
92 +
93 + private int pendingFutureTimeoutMinutes = 5;
94 +
95 + protected Set<FlowRuleExtRouterListener> routerListener = new HashSet<>();
96 + private Cache<Long, SettableFuture<FlowExtCompletedOperation>> pendingExtendFutures = CacheBuilder
97 + .newBuilder()
98 + .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
99 + // .removalListener(new TimeoutFuture())
100 + .build();
101 +
102 + private final ExecutorService futureListeners = Executors
103 + .newCachedThreadPool(namedThreads("flowstore-peer-responders"));
104 +
105 + protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
106 + @Override
107 + protected void setupKryoPool() {
108 + serializerPool = KryoNamespace.newBuilder()
109 + .register(DistributedStoreSerializers.STORE_COMMON)
110 + .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
111 + .register(FlowExtCompletedOperation.class)
112 + .register(FlowRuleBatchRequest.class)
113 + .register(DownStreamFlowEntry.class)
114 + .register(DefaultFlowRuleExt.class)
115 + .build();
116 + }
117 + };
118 +
119 + private ReplicaInfoEventListener replicaInfoEventListener;
120 +
121 + @Activate
122 + public void activate() {
123 + clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
124 + new ClusterMessageHandler() {
125 +
126 + @Override
127 + public void handle(ClusterMessage message) {
128 + // decode the extended flow entry and store them in memory.
129 + FlowRuleBatchRequest operation = SERIALIZER.decode(message.payload());
130 + log.info("received batch request {}", operation);
131 + final ListenableFuture<FlowExtCompletedOperation> f = applyBatchInternal(operation);
132 + f.addListener(new Runnable() {
133 + @Override
134 + public void run() {
135 + FlowExtCompletedOperation result = Futures.getUnchecked(f);
136 + try {
137 + message.respond(SERIALIZER.encode(result));
138 + } catch (IOException e) {
139 + log.error("Failed to respond back", e);
140 + }
141 + }
142 + }, futureListeners);
143 + }
144 + });
145 +
146 + replicaInfoManager.addListener(replicaInfoEventListener);
147 +
148 + log.info("Started");
149 + }
150 +
151 + @Deactivate
152 + public void deactivate() {
153 + clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
154 + replicaInfoManager.removeListener(replicaInfoEventListener);
155 + log.info("Stopped");
156 + }
157 +
158 + /**
159 + * apply the sub batch of flow extension rules.
160 + *
161 + * @param batchOperation batch of flow rules.
162 + * A batch can contain flow rules for a single device only.
163 + * @return Future response indicating success/failure of the batch operation
164 + * all the way down to the device.
165 + */
166 + @Override
167 + public Future<FlowExtCompletedOperation> applySubBatch(FlowRuleBatchRequest batchOperation) {
168 + // TODO Auto-generated method stub
169 + if (batchOperation.ops().isEmpty()) {
170 + return Futures.immediateFuture(new FlowExtCompletedOperation(
171 + batchOperation.batchId(), true, Collections.emptySet()));
172 + }
173 + // get the deviceId all the collection belongs to
174 + DeviceId deviceId = getBatchDeviceId(batchOperation.ops());
175 +
176 + if (deviceId == null) {
177 + log.error("This Batch exists more than two deviceId");
178 + return null;
179 + }
180 + ReplicaInfo replicaInfo = replicaInfoManager
181 + .getReplicaInfoFor(deviceId);
182 +
183 + if (replicaInfo.master().get()
184 + .equals(clusterService.getLocalNode().id())) {
185 + return applyBatchInternal(batchOperation);
186 + }
187 +
188 + log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
189 + replicaInfo.master().orNull(), deviceId);
190 +
191 + ClusterMessage message = new ClusterMessage(clusterService
192 + .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation));
193 +
194 + try {
195 + ListenableFuture<byte[]> responseFuture = clusterCommunicator
196 + .sendAndReceive(message, replicaInfo.master().get());
197 + // here should add another decode process
198 + return Futures.transform(responseFuture,
199 + new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
200 + } catch (IOException e) {
201 + return Futures.immediateFailedFuture(e);
202 + }
203 + }
204 +
205 + /**
206 + * apply the batch in local node.
207 + * It means this instance is master of the device the flow entry belongs to.
208 + *
209 + * @param batchOperation a collection of flow entry, all they should send down to one device
210 + * @return Future response indicating success/failure of the batch operation
211 + * all the way down to the device.
212 + */
213 + private ListenableFuture<FlowExtCompletedOperation> applyBatchInternal(FlowRuleBatchRequest batchOperation) {
214 + SettableFuture<FlowExtCompletedOperation> r = SettableFuture.create();
215 + pendingExtendFutures.put(batchOperation.batchId(), r);
216 + // here should notify manager to complete
217 + notify(batchOperation);
218 + return r;
219 + }
220 +
221 + /**
222 + * Get the deviceId of this batch.
223 + * The whole Batch should belong to one deviceId.
224 + *
225 + * @param batchOperation a collection of flow entry, all they should send down to one device
226 + * @return the deviceId the whole batch belongs to
227 + */
228 + private DeviceId getBatchDeviceId(Collection<FlowRuleBatchEntry> batchOperation) {
229 + Iterator<FlowRuleBatchEntry> head = batchOperation.iterator();
230 + FlowRuleBatchEntry headOp = head.next();
231 + boolean sameId = true;
232 + for (FlowRuleBatchEntry operation : batchOperation) {
233 + if (operation.target().deviceId() != headOp.target().deviceId()) {
234 + log.warn("this batch does not apply on one device Id ");
235 + sameId = false;
236 + break;
237 + }
238 + }
239 + return sameId ? headOp.target().deviceId() : null;
240 + }
241 +
242 + /**
243 + * Notify the listener of Router to do some reaction.
244 + *
245 + * @param request the requested operation to do
246 + */
247 + public void notify(FlowRuleBatchRequest request) {
248 + for (FlowRuleExtRouterListener listener : routerListener) {
249 + listener.notify(FlowRuleBatchEvent
250 + // TODO fill in the deviceId
251 + .requested(request, null));
252 + }
253 + }
254 +
255 + /**
256 + * Invoked on the completion of a storeBatch operation.
257 + *
258 + * @param event flow rule batch event
259 + */
260 + @Override
261 + public void batchOperationComplete(FlowRuleBatchEvent event) {
262 + // TODO Auto-generated method stub
263 + final Long batchId = event.subject().batchId();
264 + SettableFuture<FlowExtCompletedOperation> future = pendingExtendFutures
265 + .getIfPresent(batchId);
266 + if (future != null) {
267 + FlowRuleBatchRequest request = event.subject();
268 + CompletedBatchOperation result = event.result();
269 + FlowExtCompletedOperation completed =
270 + new FlowExtCompletedOperation(request.batchId(), result.isSuccess(), result.failedItems());
271 + future.set(completed);
272 + pendingExtendFutures.invalidate(batchId);
273 + }
274 + }
275 +
276 + /**
277 + * Register the listener to monitor Router,
278 + * The Router find master to send downStream.
279 + *
280 + * @param listener the listener to register
281 + */
282 + @Override
283 + public void addListener(FlowRuleExtRouterListener listener) {
284 + routerListener.add(listener);
285 + }
286 +
287 + /**
288 + * Remove the listener of Router.
289 + *
290 + * @param listener the listener to remove
291 + */
292 + @Override
293 + public void removeListener(FlowRuleExtRouterListener listener) {
294 + routerListener.remove(listener);
295 + }
296 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.flowext.impl;
17 +
18 +import org.onosproject.store.cluster.messaging.MessageSubject;
19 +
20 +/**
21 + * Experimental extension to the flow rule subsystem; still under development.
22 + * MessageSubjects used by DefaultFlowRuleExtRouter peer-peer communication.
23 + */
24 +public final class FlowExtRouterMessageSubjects {
25 + private FlowExtRouterMessageSubjects() {
26 + }
27 +
28 + /**
29 + * The subject of routing extended flow to specified device.
30 + */
31 + public static final MessageSubject APPLY_EXTEND_FLOWS
32 + = new MessageSubject("peer-forward-apply-batch-extension");
33 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +/**
18 + * Experimental extension to the flow rule subsystem; still under development.
19 + * <p>
20 + * Implementation of the distributed flow extension rule router using p2p synchronization
21 + * protocol. The Router is the core component of routing flow rules to specified device.
22 + * This package is still experimental at this point in time.
23 + * </p>
24 + */
25 +package org.onosproject.store.flowext.impl;