ssyoon90
Committed by Gerrit Code Review

[Emu] openTAM: NewAdaptiveFlowStatsCollector Implementation

  - NewAdaptiveFlowStatsCollector.java
   .Bug fix to initialize callCountCalAndShortFlowsTask value
   .Added flowMissingXid variable to identify individual StatsRequest or match all StatsRequest message or not
  - DefaultTypedFlowEntry.java, TypedStoredFlowEntry.java
   .Added javadoc for class
  - OpenFlowRuleProvider.java
   .Line 2: 2014 -> 2015
   .Added adaptiveFlowSampling boolean property with default
   .Added call providerService.pushFlowMetricsWithoutFlowMissing in case of  individual StatsRequest
  - FlowRuleProviderService.java
   .Added pushFlowMetricsWithoutFlowMissing() function
  - FlowRuleManager.java
   .Added pushFlowMetricsWithoutFlowMissing() implementation
  - OpenFlowControllerImpl.java
   .Bug fix to unchange the StatsRequest Xid value in case of StatsReply Flow message type

Change-Id: Id4dc4a164da654af7b6dfb090af7336e748ef118
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.net.flow;
18 +
19 +import static com.google.common.base.MoreObjects.toStringHelper;
20 +
21 +/**
22 + * Default flow entry class with FlowLiveType value, IMMEDIATE_FLOW, SHORT_FLOW, MID_FLOW, LONG_FLOW.
23 + */
24 +public class DefaultTypedFlowEntry extends DefaultFlowEntry
25 + implements TypedStoredFlowEntry {
26 + private FlowLiveType liveType;
27 +
28 + /**
29 + * Creates a typed flow entry from flow rule and its statistics, with default flow live type(IMMEDIATE_FLOW).
30 + *
31 + * @param rule the flow rule
32 + * @param state the flow state
33 + * @param life the flow duration since creation
34 + * @param packets the flow packets count
35 + * @param bytes the flow bytes count
36 + *
37 + */
38 + public DefaultTypedFlowEntry(FlowRule rule, FlowEntryState state,
39 + long life, long packets, long bytes) {
40 + super(rule, state, life, packets, bytes);
41 + this.liveType = FlowLiveType.IMMEDIATE_FLOW;
42 + }
43 +
44 + /**
45 + * Creates a typed flow entry from flow rule, with default flow live type(IMMEDIATE_FLOW).
46 + *
47 + * @param rule the flow rule
48 + *
49 + */
50 + public DefaultTypedFlowEntry(FlowRule rule) {
51 + super(rule);
52 + this.liveType = FlowLiveType.IMMEDIATE_FLOW;
53 + }
54 +
55 + /**
56 + * Creates a typed flow entry from flow entry, with default flow live type(IMMEDIATE_FLOW).
57 + *
58 + * @param fe the flow entry
59 + *
60 + */
61 + public DefaultTypedFlowEntry(FlowEntry fe) {
62 + super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());
63 + this.liveType = FlowLiveType.IMMEDIATE_FLOW;
64 + }
65 +
66 + /**
67 + * Creates a typed flow entry from flow rule and flow live type.
68 + *
69 + * @param rule the flow rule
70 + * @param liveType the flow live type
71 + *
72 + */
73 + public DefaultTypedFlowEntry(FlowRule rule, FlowLiveType liveType) {
74 + super(rule);
75 + this.liveType = liveType;
76 + }
77 +
78 + /**
79 + * Creates a typed flow entry from flow entry and flow live type.
80 + *
81 + * @param fe the flow rule
82 + * @param liveType the flow live type
83 + *
84 + */
85 + public DefaultTypedFlowEntry(FlowEntry fe, FlowLiveType liveType) {
86 + super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());
87 + this.liveType = liveType;
88 + }
89 +
90 + /**
91 + * Creates a typed flow entry from flow rule, error code and flow live type.
92 + *
93 + * @param rule the flow rule
94 + * @param errType the flow error type
95 + * @param errCode the flow error code
96 + * @param liveType the flow live type
97 + *
98 + */
99 + public DefaultTypedFlowEntry(FlowRule rule, int errType, int errCode, FlowLiveType liveType) {
100 + super(rule, errType, errCode);
101 + this.liveType = liveType;
102 + }
103 +
104 + @Override
105 + public FlowLiveType flowLiveType() {
106 + return this.liveType;
107 + }
108 +
109 + @Override
110 + public void setFlowLiveType(FlowLiveType liveType) {
111 + this.liveType = liveType;
112 + }
113 +
114 + @Override
115 + public String toString() {
116 + return toStringHelper(this)
117 + .add("entry", super.toString())
118 + .add("type", liveType)
119 + .toString();
120 + }
121 +}
122 +
...@@ -41,6 +41,15 @@ public interface FlowRuleProviderService extends ProviderService<FlowRuleProvide ...@@ -41,6 +41,15 @@ public interface FlowRuleProviderService extends ProviderService<FlowRuleProvide
41 void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries); 41 void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
42 42
43 /** 43 /**
44 + * Pushes the collection of flow entries currently applied on the given
45 + * device without flowMissing process.
46 + *
47 + * @param deviceId device identifier
48 + * @param flowEntries collection of flow rules
49 + */
50 + void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
51 +
52 + /**
44 * Indicates to the core that the requested batch operation has 53 * Indicates to the core that the requested batch operation has
45 * been completed. 54 * been completed.
46 * 55 *
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.net.flow;
18 +
19 +/**
20 + * Represents a flow live type for a given flow entry.
21 + */
22 +public interface TypedStoredFlowEntry extends StoredFlowEntry {
23 + enum FlowLiveType {
24 + /**
25 + * Indicates that this rule has been submitted for addition immediately.
26 + * Not necessarily collecting flow stats.
27 + */
28 + IMMEDIATE_FLOW,
29 +
30 + /**
31 + * Indicates that this rule has been submitted for a short time.
32 + * Necessarily collecting flow stats every calAndPollInterval.
33 + */
34 + SHORT_FLOW,
35 +
36 + /**
37 + * Indicates that this rule has been submitted for a mid time.
38 + * Necessarily collecting flow stats every midPollInterval.
39 + */
40 + MID_FLOW,
41 +
42 + /**
43 + * Indicates that this rule has been submitted for a long time.
44 + * Necessarily collecting flow stats every longPollInterval.
45 + */
46 + LONG_FLOW,
47 +
48 + /**
49 + * Indicates that this rule has been submitted for UNKNOWN or ERROR.
50 + * Not necessarily collecting flow stats.
51 + */
52 + UNKNOWN_FLOW
53 + }
54 +
55 + /**
56 + * Gets the flow live type for this entry.
57 + */
58 + FlowLiveType flowLiveType();
59 +
60 + /**
61 + * Sets the new flow live type for this entry.
62 + * @param liveType new flow live type.
63 + */
64 + void setFlowLiveType(FlowLiveType liveType);
65 +}
...@@ -388,6 +388,16 @@ public class FlowRuleManager ...@@ -388,6 +388,16 @@ public class FlowRuleManager
388 388
389 @Override 389 @Override
390 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) { 390 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
391 + pushFlowMetricsInternal(deviceId, flowEntries, true);
392 + }
393 +
394 + @Override
395 + public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
396 + pushFlowMetricsInternal(deviceId, flowEntries, false);
397 + }
398 +
399 + private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
400 + boolean useMissingFlow) {
391 Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap(); 401 Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
392 store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f)); 402 store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
393 403
...@@ -415,6 +425,9 @@ public class FlowRuleManager ...@@ -415,6 +425,9 @@ public class FlowRuleManager
415 continue; 425 continue;
416 } 426 }
417 } 427 }
428 +
429 + // DO NOT reinstall
430 + if (useMissingFlow) {
418 for (FlowEntry rule : storedRules.keySet()) { 431 for (FlowEntry rule : storedRules.keySet()) {
419 try { 432 try {
420 // there are rules in the store that aren't on the switch 433 // there are rules in the store that aren't on the switch
...@@ -425,7 +438,7 @@ public class FlowRuleManager ...@@ -425,7 +438,7 @@ public class FlowRuleManager
425 continue; 438 continue;
426 } 439 }
427 } 440 }
428 - 441 + }
429 } 442 }
430 443
431 @Override 444 @Override
......
...@@ -273,6 +273,7 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -273,6 +273,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
273 OFFlowStatsReply.Builder rep = 273 OFFlowStatsReply.Builder rep =
274 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply(); 274 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
275 rep.setEntries(Lists.newLinkedList(flowStats)); 275 rep.setEntries(Lists.newLinkedList(flowStats));
276 + rep.setXid(reply.getXid());
276 executorMsgs.submit(new OFMessageHandler(dpid, rep.build())); 277 executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
277 } 278 }
278 break; 279 break;
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.provider.of.flow.impl;
18 +
19 +import java.util.HashSet;
20 +import java.util.List;
21 +import java.util.Map;
22 +import java.util.Optional;
23 +import java.util.Set;
24 +import java.util.concurrent.TimeUnit;
25 +import java.util.concurrent.ScheduledFuture;
26 +import java.util.concurrent.Executors;
27 +import java.util.concurrent.ScheduledExecutorService;
28 +
29 +import com.google.common.base.Objects;
30 +import com.google.common.collect.ImmutableSet;
31 +import com.google.common.collect.Maps;
32 +import com.google.common.collect.Sets;
33 +
34 +import org.onosproject.net.flow.DefaultTypedFlowEntry;
35 +import org.onosproject.net.flow.FlowEntry;
36 +import org.onosproject.net.flow.FlowId;
37 +import org.onosproject.net.flow.FlowRule;
38 +import org.onosproject.net.flow.StoredFlowEntry;
39 +import org.onosproject.net.flow.TypedStoredFlowEntry;
40 +import org.onosproject.net.flow.instructions.Instruction;
41 +import org.onosproject.net.flow.instructions.Instructions;
42 +import org.onosproject.openflow.controller.OpenFlowSwitch;
43 +import org.onosproject.openflow.controller.RoleState;
44 +import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
45 +import org.projectfloodlight.openflow.protocol.match.Match;
46 +import org.projectfloodlight.openflow.types.OFPort;
47 +import org.projectfloodlight.openflow.types.TableId;
48 +import org.slf4j.Logger;
49 +
50 +import static com.google.common.base.Preconditions.checkNotNull;
51 +import static org.onlab.util.Tools.groupedThreads;
52 +import static org.onosproject.net.flow.TypedStoredFlowEntry.*;
53 +import static org.slf4j.LoggerFactory.getLogger;
54 +
55 +/**
56 + * Efficiently and adaptively collects flow statistics for the specified switch.
57 + */
58 +public class NewAdaptiveFlowStatsCollector {
59 +
60 + private final Logger log = getLogger(getClass());
61 +
62 + private final OpenFlowSwitch sw;
63 +
64 + private ScheduledExecutorService adaptiveFlowStatsScheduler =
65 + Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
66 + private ScheduledFuture<?> calAndShortFlowsThread;
67 + private ScheduledFuture<?> midFlowsThread;
68 + private ScheduledFuture<?> longFlowsThread;
69 +
70 + // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
71 + private CalAndShortFlowsTask calAndShortFlowsTask;
72 + // Task that collects stats MID flows every 2*calAndPollInterval
73 + private MidFlowsTask midFlowsTask;
74 + // Task that collects stats LONG flows every 3*calAndPollInterval
75 + private LongFlowsTask longFlowsTask;
76 +
77 + private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
78 + private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
79 + private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
80 + //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
81 + // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
82 + private static final int ENTIRE_POLL_TIMES = 6;
83 +
84 + private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
85 + private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
86 + private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
87 +
88 + private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 + private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 + private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
91 + // only used for checking condition at each task if it collects entire flows from a given switch or not
92 + private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
93 +
94 + // Number of call count of each Task,
95 + // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
96 + private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
97 + private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
98 + private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
99 +
100 + private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
101 +
102 + private boolean isFirstTimeStart = true;
103 +
104 + public static final long NO_FLOW_MISSING_XID = (-1);
105 + private long flowMissingXid = NO_FLOW_MISSING_XID;
106 +
107 + /**
108 + * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
109 + *
110 + * @param sw switch to pull
111 + * @param pollInterval cal and immediate poll frequency in seconds
112 + */
113 + NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
114 + this.sw = sw;
115 +
116 + initMemberVars(pollInterval);
117 + }
118 +
119 + // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
120 + private void initMemberVars(int pollInterval) {
121 + if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
122 + this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
123 + } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
124 + this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
125 + } else {
126 + this.calAndPollInterval = pollInterval;
127 + }
128 +
129 + calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
130 + midPollInterval = MID_POLL_TIMES * calAndPollInterval;
131 + longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
132 + entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
133 +
134 + callCountCalAndShortFlowsTask = 0;
135 + callCountMidFlowsTask = 0;
136 + callCountLongFlowsTask = 0;
137 +
138 + flowMissingXid = NO_FLOW_MISSING_XID;
139 + }
140 +
141 + /**
142 + * Adjusts adaptive poll frequency.
143 + *
144 + * @param pollInterval poll frequency in seconds
145 + */
146 + synchronized void adjustCalAndPollInterval(int pollInterval) {
147 + initMemberVars(pollInterval);
148 +
149 + if (calAndShortFlowsThread != null) {
150 + calAndShortFlowsThread.cancel(false);
151 + }
152 + if (midFlowsThread != null) {
153 + midFlowsThread.cancel(false);
154 + }
155 + if (longFlowsThread != null) {
156 + longFlowsThread.cancel(false);
157 + }
158 +
159 + calAndShortFlowsTask = new CalAndShortFlowsTask();
160 + calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
161 + calAndShortFlowsTask,
162 + 0,
163 + calAndPollInterval,
164 + TimeUnit.SECONDS);
165 +
166 + midFlowsTask = new MidFlowsTask();
167 + midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
168 + midFlowsTask,
169 + 0,
170 + midPollInterval,
171 + TimeUnit.SECONDS);
172 +
173 + longFlowsTask = new LongFlowsTask();
174 + longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
175 + longFlowsTask,
176 + 0,
177 + longPollInterval,
178 + TimeUnit.SECONDS);
179 +
180 + log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
181 + }
182 +
183 + private class CalAndShortFlowsTask implements Runnable {
184 + @Override
185 + public void run() {
186 + if (sw.getRole() == RoleState.MASTER) {
187 + log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
188 +
189 + if (isFirstTimeStart) {
190 + // isFirstTimeStart, get entire flow stats from a given switch sw
191 + log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
192 + sw.getStringId());
193 + ofFlowStatsRequestAllSend();
194 +
195 + callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
196 + isFirstTimeStart = false;
197 + } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
198 + // entire_poll_times, get entire flow stats from a given switch sw
199 + log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
200 + ofFlowStatsRequestAllSend();
201 +
202 + callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
203 + //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
204 + //
205 + } else {
206 + calAndShortFlowsTaskInternal();
207 + callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
208 + }
209 + }
210 + }
211 + }
212 +
213 + // send openflow flow stats request message with getting all flow entries to a given switch sw
214 + private void ofFlowStatsRequestAllSend() {
215 + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
216 + .setMatch(sw.factory().matchWildcardAll())
217 + .setTableId(TableId.ALL)
218 + .setOutPort(OFPort.NO_MASK)
219 + .build();
220 +
221 + synchronized (this) {
222 + // set the request xid to check the reply in OpenFlowRuleProvider
223 + // After processing the reply of this request message,
224 + // this must be set to NO_FLOW_MISSING_XID(-1) by provider
225 + setFlowMissingXid(request.getXid());
226 + log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
227 +
228 + sw.sendMsg(request);
229 + }
230 + }
231 +
232 + // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
233 + private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
234 + // set find match
235 + Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty()).buildMatch();
236 + // set find tableId
237 + TableId tableId = TableId.of(fe.tableId());
238 + // set output port
239 + Instruction ins = fe.treatment().allInstructions().stream()
240 + .filter(i -> (i.type() == Instruction.Type.OUTPUT))
241 + .findFirst()
242 + .orElse(null);
243 + OFPort ofPort = OFPort.NO_MASK;
244 + if (ins != null) {
245 + Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246 + ofPort = OFPort.of((int) ((out.port().toLong())));
247 + }
248 +
249 + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
250 + .setMatch(match)
251 + .setTableId(tableId)
252 + .setOutPort(ofPort)
253 + .build();
254 +
255 + synchronized (this) {
256 + if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257 + log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258 + + " set no flow missing xid anyway, for {}",
259 + sw.getStringId());
260 + setFlowMissingXid(NO_FLOW_MISSING_XID);
261 + }
262 +
263 + sw.sendMsg(request);
264 + }
265 + }
266 +
267 + private void calAndShortFlowsTaskInternal() {
268 + deviceFlowTable.checkAndMoveLiveFlowAll();
269 +
270 + deviceFlowTable.getShortFlows().forEach(fe -> {
271 + ofFlowStatsRequestFlowSend(fe);
272 + });
273 + }
274 +
275 + private class MidFlowsTask implements Runnable {
276 + @Override
277 + public void run() {
278 + if (sw.getRole() == RoleState.MASTER) {
279 + log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
280 +
281 + // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282 + if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283 + callCountMidFlowsTask = MID_POLL_TIMES;
284 + } else {
285 + midFlowsTaskInternal();
286 + callCountMidFlowsTask += MID_POLL_TIMES;
287 + }
288 + }
289 + }
290 + }
291 +
292 + private void midFlowsTaskInternal() {
293 + deviceFlowTable.getMidFlows().forEach(fe -> {
294 + ofFlowStatsRequestFlowSend(fe);
295 + });
296 + }
297 +
298 + private class LongFlowsTask implements Runnable {
299 + @Override
300 + public void run() {
301 + if (sw.getRole() == RoleState.MASTER) {
302 + log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
303 +
304 + // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305 + if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306 + callCountLongFlowsTask = LONG_POLL_TIMES;
307 + } else {
308 + longFlowsTaskInternal();
309 + callCountLongFlowsTask += LONG_POLL_TIMES;
310 + }
311 + }
312 + }
313 + }
314 +
315 + private void longFlowsTaskInternal() {
316 + deviceFlowTable.getLongFlows().forEach(fe -> {
317 + ofFlowStatsRequestFlowSend(fe);
318 + });
319 + }
320 +
321 + /**
322 + * start adaptive flow statistic collection.
323 + *
324 + */
325 + public synchronized void start() {
326 + log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327 + callCountCalAndShortFlowsTask = 0;
328 + callCountMidFlowsTask = 0;
329 + callCountLongFlowsTask = 0;
330 +
331 + isFirstTimeStart = true;
332 +
333 + // Initially start polling quickly. Then drop down to configured value
334 + calAndShortFlowsTask = new CalAndShortFlowsTask();
335 + calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336 + calAndShortFlowsTask,
337 + 1,
338 + calAndPollInterval,
339 + TimeUnit.SECONDS);
340 +
341 + midFlowsTask = new MidFlowsTask();
342 + midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
343 + midFlowsTask,
344 + 1,
345 + midPollInterval,
346 + TimeUnit.SECONDS);
347 +
348 + longFlowsTask = new LongFlowsTask();
349 + longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
350 + longFlowsTask,
351 + 1,
352 + longPollInterval,
353 + TimeUnit.SECONDS);
354 +
355 + log.info("Started");
356 + }
357 +
358 + /**
359 + * stop adaptive flow statistic collection.
360 + *
361 + */
362 + public synchronized void stop() {
363 + log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364 + if (calAndShortFlowsThread != null) {
365 + calAndShortFlowsThread.cancel(true);
366 + }
367 + if (midFlowsThread != null) {
368 + midFlowsThread.cancel(true);
369 + }
370 + if (longFlowsThread != null) {
371 + longFlowsThread.cancel(true);
372 + }
373 +
374 + adaptiveFlowStatsScheduler.shutdownNow();
375 +
376 + isFirstTimeStart = false;
377 +
378 + log.info("Stopped");
379 + }
380 +
381 + /**
382 + * add typed flow entry from flow rule into the internal flow table.
383 + *
384 + * @param flowRules the flow rules
385 + *
386 + */
387 + public synchronized void addWithFlowRule(FlowRule... flowRules) {
388 + for (FlowRule fr : flowRules) {
389 + // First remove old entry unconditionally, if exist
390 + deviceFlowTable.remove(fr);
391 +
392 + // add new flow entry, we suppose IMMEDIATE_FLOW
393 + TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394 + FlowLiveType.IMMEDIATE_FLOW);
395 + deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
396 + }
397 + }
398 +
399 + /**
400 + * add or update typed flow entry from flow entry into the internal flow table.
401 + *
402 + * @param flowEntries the flow entries
403 + *
404 + */
405 + public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406 + for (FlowEntry fe : flowEntries) {
407 + // check if this new rule is an update to an existing entry
408 + TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
409 +
410 + if (stored != null) {
411 + // duplicated flow entry is collected!, just skip
412 + if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413 + && fe.life() == stored.life()) {
414 + log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415 + + ",is DUPLICATED stats collection, just skip."
416 + + " AdaptiveStats collection thread for {}",
417 + sw.getStringId());
418 +
419 + stored.setLastSeen();
420 + continue;
421 + } else if (fe.life() < stored.life()) {
422 + // Invalid updates the stats values, i.e., bytes, packets, durations ...
423 + log.debug("addOrUpdateFlows():" +
424 + " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
425 + " new flowId=" + Long.toHexString(fe.id().value()) +
426 + ", old flowId=" + Long.toHexString(stored.id().value()) +
427 + ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
428 + ", new life=" + fe.life() + ", old life=" + stored.life() +
429 + ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
430 + // go next
431 + stored.setLastSeen();
432 + continue;
433 + }
434 +
435 + // update now
436 + stored.setLife(fe.life());
437 + stored.setPackets(fe.packets());
438 + stored.setBytes(fe.bytes());
439 + stored.setLastSeen();
440 + if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
441 + // flow is really RULE_ADDED
442 + stored.setState(FlowEntry.FlowEntryState.ADDED);
443 + }
444 + // flow is RULE_UPDATED, skip adding and just updating flow live table
445 + //deviceFlowTable.calAndSetFlowLiveType(stored);
446 + continue;
447 + }
448 +
449 + // add new flow entry, we suppose IMMEDIATE_FLOW
450 + TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
451 + FlowLiveType.IMMEDIATE_FLOW);
452 + deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
453 + }
454 + }
455 +
456 + /**
457 + * remove typed flow entry from the internal flow table.
458 + *
459 + * @param flowRules the flow entries
460 + *
461 + */
462 + public synchronized void removeFlows(FlowRule... flowRules) {
463 + for (FlowRule rule : flowRules) {
464 + deviceFlowTable.remove(rule);
465 + }
466 + }
467 +
468 + // same as removeFlows() function
469 + /**
470 + * remove typed flow entry from the internal flow table.
471 + *
472 + * @param flowRules the flow entries
473 + *
474 + */
475 + public void flowRemoved(FlowRule... flowRules) {
476 + removeFlows(flowRules);
477 + }
478 +
479 + // same as addOrUpdateFlows() function
480 + /**
481 + * add or update typed flow entry from flow entry into the internal flow table.
482 + *
483 + * @param flowEntries the flow entry list
484 + *
485 + */
486 + public void pushFlowMetrics(List<FlowEntry> flowEntries) {
487 + flowEntries.forEach(fe -> {
488 + addOrUpdateFlows(fe);
489 + });
490 + }
491 +
492 + /**
493 + * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
494 + *
495 + */
496 + public long getFlowMissingXid() {
497 + return flowMissingXid;
498 + }
499 +
500 + /**
501 + * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
502 + *
503 + * @param flowMissingXid the OFFlowStatsRequest message Id
504 + *
505 + */
506 + public void setFlowMissingXid(long flowMissingXid) {
507 + this.flowMissingXid = flowMissingXid;
508 + }
509 +
510 + private class InternalDeviceFlowTable {
511 +
512 + private final Map<FlowId, Set<TypedStoredFlowEntry>>
513 + flowEntries = Maps.newConcurrentMap();
514 +
515 + private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
516 + private final Set<StoredFlowEntry> midFlows = new HashSet<>();
517 + private final Set<StoredFlowEntry> longFlows = new HashSet<>();
518 +
519 + // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
520 + private final long latencyFlowStatsRequestAndReplyMillis = 500;
521 +
522 +
523 + // Statistics for table operation
524 + private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
525 + private long removeCount = 0;
526 +
527 + /**
528 + * Resets all count values with zero.
529 + *
530 + */
531 + public void resetAllCount() {
532 + addCount = 0;
533 + addWithSetFlowLiveTypeCount = 0;
534 + removeCount = 0;
535 + }
536 +
537 + // get set of flow entries for the given flowId
538 + private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
539 + return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
540 + }
541 +
542 + // get flow entry for the given flow rule
543 + private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
544 + Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
545 + return flowEntries.stream()
546 + .filter(entry -> Objects.equal(entry, rule))
547 + .findAny()
548 + .orElse(null);
549 + }
550 +
551 + // get the flow entries for all flows in flow table
552 + private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
553 + Set<TypedStoredFlowEntry> result = Sets.newHashSet();
554 +
555 + flowEntries.values().forEach(result::addAll);
556 + return result;
557 + }
558 +
559 + /**
560 + * Gets the number of flow entry in flow table.
561 + *
562 + * @return the number of flow entry.
563 + *
564 + */
565 + public long getFlowCount() {
566 + return flowEntries.values().stream().mapToLong(Set::size).sum();
567 + }
568 +
569 + /**
570 + * Gets the number of flow entry in flow table.
571 + *
572 + * @param rule the flow rule
573 + * @return the typed flow entry.
574 + *
575 + */
576 + public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
577 + checkNotNull(rule);
578 +
579 + return getFlowEntryInternal(rule);
580 + }
581 +
582 + /**
583 + * Gets the all typed flow entries in flow table.
584 + *
585 + * @return the set of typed flow entry.
586 + *
587 + */
588 + public Set<TypedStoredFlowEntry> getFlowEntries() {
589 + return getFlowEntriesInternal();
590 + }
591 +
592 + /**
593 + * Gets the short typed flow entries in flow table.
594 + *
595 + * @return the set of typed flow entry.
596 + *
597 + */
598 + public Set<StoredFlowEntry> getShortFlows() {
599 + return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
600 + }
601 +
602 + /**
603 + * Gets the mid typed flow entries in flow table.
604 + *
605 + * @return the set of typed flow entry.
606 + *
607 + */
608 + public Set<StoredFlowEntry> getMidFlows() {
609 + return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
610 + }
611 +
612 + /**
613 + * Gets the long typed flow entries in flow table.
614 + *
615 + * @return the set of typed flow entry.
616 + *
617 + */
618 + public Set<StoredFlowEntry> getLongFlows() {
619 + return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
620 + }
621 +
622 + /**
623 + * Add typed flow entry into table only.
624 + *
625 + * @param rule the flow rule
626 + *
627 + */
628 + public synchronized void add(TypedStoredFlowEntry rule) {
629 + checkNotNull(rule);
630 +
631 + //rule have to be new DefaultTypedFlowEntry
632 + boolean result = getFlowEntriesInternal(rule.id()).add(rule);
633 +
634 + if (result) {
635 + addCount++;
636 + }
637 + }
638 +
639 + /**
640 + * Calculates and set the flow live type at the first time,
641 + * and then add it into a corresponding typed flow table.
642 + *
643 + * @param rule the flow rule
644 + *
645 + */
646 + public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
647 + checkNotNull(rule);
648 +
649 + calAndSetFlowLiveTypeInternal(rule);
650 + }
651 +
652 + /**
653 + * Add the typed flow entry into table, and calculates and set the flow live type,
654 + * and then add it into a corresponding typed flow table.
655 + *
656 + * @param rule the flow rule
657 + *
658 + */
659 + public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
660 + checkNotNull(rule);
661 +
662 + //rule have to be new DefaultTypedFlowEntry
663 + boolean result = getFlowEntriesInternal(rule.id()).add(rule);
664 + if (result) {
665 + calAndSetFlowLiveTypeInternal(rule);
666 + addWithSetFlowLiveTypeCount++;
667 + } else {
668 + log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
669 + + " ADD Failed, cause it may already exists in table !!!,"
670 + + " AdaptiveStats collection thread for {}",
671 + sw.getStringId());
672 + }
673 + }
674 +
675 + // In real, calculates and set the flow live type at the first time,
676 + // and then add it into a corresponding typed flow table
677 + private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
678 + long life = rule.life();
679 + FlowLiveType prevFlowLiveType = rule.flowLiveType();
680 +
681 + if (life >= longPollInterval) {
682 + rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
683 + longFlows.add(rule);
684 + } else if (life >= midPollInterval) {
685 + rule.setFlowLiveType(FlowLiveType.MID_FLOW);
686 + midFlows.add(rule);
687 + } else if (life >= calAndPollInterval) {
688 + rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
689 + shortFlows.add(rule);
690 + } else if (life >= 0) {
691 + rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
692 + } else { // life < 0
693 + rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
694 + }
695 +
696 + if (rule.flowLiveType() != prevFlowLiveType) {
697 + switch (prevFlowLiveType) {
698 + // delete it from previous flow table
699 + case SHORT_FLOW:
700 + shortFlows.remove(rule);
701 + break;
702 + case MID_FLOW:
703 + midFlows.remove(rule);
704 + break;
705 + case LONG_FLOW:
706 + longFlows.remove(rule);
707 + break;
708 + default:
709 + break;
710 + }
711 + }
712 + }
713 +
714 +
715 + // check the flow live type based on current time, then set and add it into corresponding table
716 + private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
717 + long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
718 + // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
719 + long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
720 +
721 + // fe.life() unit is SECOND!
722 + long liveTime = fe.life() + fromLastSeen;
723 +
724 + // check flow timeout
725 + if (fe.timeout() > calAndPollInterval && fromLastSeen > fe.timeout()) {
726 + if (!fe.isPermanent()) {
727 + log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
728 + + ", liveType=" + fe.flowLiveType()
729 + + ", liveTime=" + liveTime
730 + + ", life=" + fe.life()
731 + + ", fromLastSeen=" + fromLastSeen
732 + + ", timeout=" + fe.timeout()
733 + + ", isPermanent=" + fe.isPermanent()
734 + + " AdaptiveStats collection thread for {}",
735 + sw.getStringId());
736 + return false;
737 + }
738 + }
739 +
740 + switch (fe.flowLiveType()) {
741 + case IMMEDIATE_FLOW:
742 + if (liveTime >= longPollInterval) {
743 + fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
744 + longFlows.add(fe);
745 + } else if (liveTime >= midPollInterval) {
746 + fe.setFlowLiveType(FlowLiveType.MID_FLOW);
747 + midFlows.add(fe);
748 + } else if (liveTime >= calAndPollInterval) {
749 + fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
750 + shortFlows.add(fe);
751 + }
752 + break;
753 + case SHORT_FLOW:
754 + if (liveTime >= longPollInterval) {
755 + fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
756 + shortFlows.remove(fe);
757 + longFlows.add(fe);
758 + } else if (liveTime >= midPollInterval) {
759 + fe.setFlowLiveType(FlowLiveType.MID_FLOW);
760 + shortFlows.remove(fe);
761 + midFlows.add(fe);
762 + }
763 + break;
764 + case MID_FLOW:
765 + if (liveTime >= longPollInterval) {
766 + fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
767 + midFlows.remove(fe);
768 + longFlows.add(fe);
769 + }
770 + break;
771 + case LONG_FLOW:
772 + if (fromLastSeen > entirePollInterval) {
773 + log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
774 + return false;
775 + }
776 + break;
777 + case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
778 + default :
779 + // Error Unknown Live Type
780 + log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
781 + + "AdaptiveStats collection thread for {}",
782 + sw.getStringId());
783 + return false;
784 + }
785 +
786 + log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
787 + + ", state=" + fe.state()
788 + + ", After liveType=" + fe.flowLiveType()
789 + + ", liveTime=" + liveTime
790 + + ", life=" + fe.life()
791 + + ", bytes=" + fe.bytes()
792 + + ", packets=" + fe.packets()
793 + + ", fromLastSeen=" + fromLastSeen
794 + + ", priority=" + fe.priority()
795 + + ", selector=" + fe.selector().criteria()
796 + + ", treatment=" + fe.treatment()
797 + + " AdaptiveStats collection thread for {}",
798 + sw.getStringId());
799 +
800 + return true;
801 + }
802 +
803 + /**
804 + * Check and move live type for all type flow entries in table at every calAndPollInterval time.
805 + *
806 + */
807 + public void checkAndMoveLiveFlowAll() {
808 + Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
809 +
810 + long calCurTime = System.currentTimeMillis();
811 + typedFlowEntries.forEach(fe -> {
812 + if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
813 + remove(fe);
814 + }
815 + });
816 +
817 + // print table counts for debug
818 + if (log.isDebugEnabled()) {
819 + synchronized (this) {
820 + long totalFlowCount = getFlowCount();
821 + long shortFlowCount = shortFlows.size();
822 + long midFlowCount = midFlows.size();
823 + long longFlowCount = longFlows.size();
824 + long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
825 + long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
826 +
827 + log.debug("--------------------------------------------------------------------------- for {}",
828 + sw.getStringId());
829 + log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
830 + + ", add - remove_Count=" + calTotalCount
831 + + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
832 + + ", SHORT_FLOW_Count=" + shortFlowCount
833 + + ", MID_FLOW_Count=" + midFlowCount
834 + + ", LONG_FLOW_Count=" + longFlowCount
835 + + ", add_Count=" + addCount
836 + + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
837 + + ", remove_Count=" + removeCount
838 + + " AdaptiveStats collection thread for {}", sw.getStringId());
839 + log.debug("--------------------------------------------------------------------------- for {}",
840 + sw.getStringId());
841 + if (totalFlowCount != calTotalCount) {
842 + log.error("checkAndMoveLiveFlowAll, Real total flow count and "
843 + + "calculated total flow count do NOT match, something is wrong internally "
844 + + "or check counter value bound is over!");
845 + }
846 + if (immediateFlowCount < 0) {
847 + log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
848 + + "something is wrong internally "
849 + + "or check counter value bound is over!");
850 + }
851 + }
852 + }
853 + log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
854 + }
855 +
856 + /**
857 + * Remove the typed flow entry from table.
858 + *
859 + * @param rule the flow rule
860 + *
861 + */
862 + public synchronized void remove(FlowRule rule) {
863 + checkNotNull(rule);
864 +
865 + TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
866 + if (removeStore != null) {
867 + removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
868 + boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
869 +
870 + if (result) {
871 + removeCount++;
872 + }
873 + }
874 + }
875 +
876 + // Remove the typed flow entry from corresponding table
877 + private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
878 + switch (fe.flowLiveType()) {
879 + case IMMEDIATE_FLOW:
880 + // do nothing
881 + break;
882 + case SHORT_FLOW:
883 + shortFlows.remove(fe);
884 + break;
885 + case MID_FLOW:
886 + midFlows.remove(fe);
887 + break;
888 + case LONG_FLOW:
889 + longFlows.remove(fe);
890 + break;
891 + default: // error in Flow Live Type
892 + log.error("removeLiveFlowsInternal, Unknown Live Type error!");
893 + break;
894 + }
895 + }
896 + }
897 +}
1 /* 1 /*
2 - * Copyright 2014 Open Networking Laboratory 2 + * Copyright 2015 Open Networking Laboratory
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
...@@ -76,6 +76,7 @@ import java.util.Timer; ...@@ -76,6 +76,7 @@ import java.util.Timer;
76 import java.util.concurrent.TimeUnit; 76 import java.util.concurrent.TimeUnit;
77 import java.util.stream.Collectors; 77 import java.util.stream.Collectors;
78 78
79 +import static com.google.common.base.Preconditions.checkNotNull;
79 import static com.google.common.base.Strings.isNullOrEmpty; 80 import static com.google.common.base.Strings.isNullOrEmpty;
80 import static org.onlab.util.Tools.get; 81 import static org.onlab.util.Tools.get;
81 import static org.slf4j.LoggerFactory.getLogger; 82 import static org.slf4j.LoggerFactory.getLogger;
...@@ -99,11 +100,16 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -99,11 +100,16 @@ public class OpenFlowRuleProvider extends AbstractProvider
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected ComponentConfigService cfgService; 101 protected ComponentConfigService cfgService;
101 102
102 - private static final int DEFAULT_POLL_FREQUENCY = 10; 103 + private static final int DEFAULT_POLL_FREQUENCY = 5;
103 @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY, 104 @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
104 label = "Frequency (in seconds) for polling flow statistics") 105 label = "Frequency (in seconds) for polling flow statistics")
105 private int flowPollFrequency = DEFAULT_POLL_FREQUENCY; 106 private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
106 107
108 + private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true;
109 + @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING,
110 + label = "Adaptive Flow Sampling is on or off")
111 + private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
112 +
107 private FlowRuleProviderService providerService; 113 private FlowRuleProviderService providerService;
108 114
109 private final InternalFlowProvider listener = new InternalFlowProvider(); 115 private final InternalFlowProvider listener = new InternalFlowProvider();
...@@ -111,7 +117,10 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -111,7 +117,10 @@ public class OpenFlowRuleProvider extends AbstractProvider
111 private Cache<Long, InternalCacheEntry> pendingBatches; 117 private Cache<Long, InternalCacheEntry> pendingBatches;
112 118
113 private final Timer timer = new Timer("onos-openflow-collector"); 119 private final Timer timer = new Timer("onos-openflow-collector");
114 - private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap(); 120 + private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap();
121 +
122 + // NewAdaptiveFlowStatsCollector Set
123 + private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap();
115 124
116 /** 125 /**
117 * Creates an OpenFlow host provider. 126 * Creates an OpenFlow host provider.
...@@ -128,9 +137,11 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -128,9 +137,11 @@ public class OpenFlowRuleProvider extends AbstractProvider
128 controller.addEventListener(listener); 137 controller.addEventListener(listener);
129 138
130 pendingBatches = createBatchCache(); 139 pendingBatches = createBatchCache();
140 +
131 createCollectors(); 141 createCollectors();
132 142
133 - log.info("Started"); 143 + log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}",
144 + flowPollFrequency, adaptiveFlowSampling);
134 } 145 }
135 146
136 @Deactivate 147 @Deactivate
...@@ -161,6 +172,20 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -161,6 +172,20 @@ public class OpenFlowRuleProvider extends AbstractProvider
161 } 172 }
162 173
163 log.info("Settings: flowPollFrequency={}", flowPollFrequency); 174 log.info("Settings: flowPollFrequency={}", flowPollFrequency);
175 +
176 + boolean newAdaptiveFlowSampling;
177 + String s = get(properties, "adaptiveFlowSampling");
178 + newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
179 +
180 + if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
181 + // stop previous collector
182 + stopCollectors();
183 + adaptiveFlowSampling = newAdaptiveFlowSampling;
184 + // create new collectors
185 + createCollectors();
186 + }
187 +
188 + log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
164 } 189 }
165 190
166 private Cache<Long, InternalCacheEntry> createBatchCache() { 191 private Cache<Long, InternalCacheEntry> createBatchCache() {
...@@ -179,19 +204,38 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -179,19 +204,38 @@ public class OpenFlowRuleProvider extends AbstractProvider
179 } 204 }
180 205
181 private void createCollector(OpenFlowSwitch sw) { 206 private void createCollector(OpenFlowSwitch sw) {
207 + if (adaptiveFlowSampling) {
208 + // NewAdaptiveFlowStatsCollector Constructor
209 + NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency);
210 + fsc.start();
211 + afsCollectors.put(new Dpid(sw.getId()), fsc);
212 + } else {
182 FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency); 213 FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
183 fsc.start(); 214 fsc.start();
184 - collectors.put(new Dpid(sw.getId()), fsc); 215 + simpleCollectors.put(new Dpid(sw.getId()), fsc);
216 + }
185 } 217 }
186 218
187 private void stopCollectors() { 219 private void stopCollectors() {
188 - collectors.values().forEach(FlowStatsCollector::stop); 220 + if (adaptiveFlowSampling) {
189 - collectors.clear(); 221 + // NewAdaptiveFlowStatsCollector Destructor
222 + afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop);
223 + afsCollectors.clear();
224 + } else {
225 + simpleCollectors.values().forEach(FlowStatsCollector::stop);
226 + simpleCollectors.clear();
227 + }
190 } 228 }
191 229
192 private void adjustRate() { 230 private void adjustRate() {
193 DefaultLoad.setPollInterval(flowPollFrequency); 231 DefaultLoad.setPollInterval(flowPollFrequency);
194 - collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency)); 232 +
233 + if (adaptiveFlowSampling) {
234 + // NewAdaptiveFlowStatsCollector calAndPollInterval
235 + afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency));
236 + } else {
237 + simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
238 + }
195 } 239 }
196 240
197 @Override 241 @Override
...@@ -202,8 +246,9 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -202,8 +246,9 @@ public class OpenFlowRuleProvider extends AbstractProvider
202 } 246 }
203 247
204 private void applyRule(FlowRule flowRule) { 248 private void applyRule(FlowRule flowRule) {
205 - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() 249 + Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
206 - .uri())); 250 + OpenFlowSwitch sw = controller.getSwitch(dpid);
251 +
207 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); 252 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
208 if (hasPayload(flowRuleExtPayLoad)) { 253 if (hasPayload(flowRuleExtPayLoad)) {
209 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); 254 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
...@@ -212,6 +257,11 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -212,6 +257,11 @@ public class OpenFlowRuleProvider extends AbstractProvider
212 } 257 }
213 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(), 258 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
214 Optional.empty()).buildFlowAdd()); 259 Optional.empty()).buildFlowAdd());
260 +
261 + if (adaptiveFlowSampling) {
262 + // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
263 + afsCollectors.get(dpid).addWithFlowRule(flowRule);
264 + }
215 } 265 }
216 266
217 @Override 267 @Override
...@@ -222,8 +272,9 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -222,8 +272,9 @@ public class OpenFlowRuleProvider extends AbstractProvider
222 } 272 }
223 273
224 private void removeRule(FlowRule flowRule) { 274 private void removeRule(FlowRule flowRule) {
225 - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() 275 + Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
226 - .uri())); 276 + OpenFlowSwitch sw = controller.getSwitch(dpid);
277 +
227 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); 278 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
228 if (hasPayload(flowRuleExtPayLoad)) { 279 if (hasPayload(flowRuleExtPayLoad)) {
229 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); 280 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
...@@ -232,6 +283,11 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -232,6 +283,11 @@ public class OpenFlowRuleProvider extends AbstractProvider
232 } 283 }
233 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(), 284 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
234 Optional.empty()).buildFlowDel()); 285 Optional.empty()).buildFlowDel());
286 +
287 + if (adaptiveFlowSampling) {
288 + // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
289 + afsCollectors.get(dpid).removeFlows(flowRule);
290 + }
235 } 291 }
236 292
237 @Override 293 @Override
...@@ -242,11 +298,12 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -242,11 +298,12 @@ public class OpenFlowRuleProvider extends AbstractProvider
242 298
243 @Override 299 @Override
244 public void executeBatch(FlowRuleBatchOperation batch) { 300 public void executeBatch(FlowRuleBatchOperation batch) {
301 + checkNotNull(batch);
245 302
246 pendingBatches.put(batch.id(), new InternalCacheEntry(batch)); 303 pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
247 304
248 - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId() 305 + Dpid dpid = Dpid.dpid(batch.deviceId().uri());
249 - .uri())); 306 + OpenFlowSwitch sw = controller.getSwitch(dpid);
250 OFFlowMod mod; 307 OFFlowMod mod;
251 for (FlowRuleBatchEntry fbe : batch.getOperations()) { 308 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
252 // flow is the third party privacy flow 309 // flow is the third party privacy flow
...@@ -262,12 +319,28 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -262,12 +319,28 @@ public class OpenFlowRuleProvider extends AbstractProvider
262 switch (fbe.operator()) { 319 switch (fbe.operator()) {
263 case ADD: 320 case ADD:
264 mod = builder.buildFlowAdd(); 321 mod = builder.buildFlowAdd();
322 +
323 + if (adaptiveFlowSampling) {
324 + // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
325 + afsCollectors.get(dpid).addWithFlowRule(fbe.target());
326 + }
265 break; 327 break;
266 case REMOVE: 328 case REMOVE:
267 mod = builder.buildFlowDel(); 329 mod = builder.buildFlowDel();
330 +
331 + if (adaptiveFlowSampling) {
332 + // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
333 + afsCollectors.get(dpid).removeFlows(fbe.target());
334 + }
268 break; 335 break;
269 case MODIFY: 336 case MODIFY:
270 mod = builder.buildFlowMod(); 337 mod = builder.buildFlowMod();
338 +
339 + if (adaptiveFlowSampling) {
340 + // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
341 + // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not
342 + afsCollectors.get(dpid).addOrUpdateFlows((FlowEntry) fbe.target());
343 + }
271 break; 344 break;
272 default: 345 default:
273 log.error("Unsupported batch operation {}; skipping flowmod {}", 346 log.error("Unsupported batch operation {}; skipping flowmod {}",
...@@ -292,16 +365,26 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -292,16 +365,26 @@ public class OpenFlowRuleProvider extends AbstractProvider
292 365
293 @Override 366 @Override
294 public void switchAdded(Dpid dpid) { 367 public void switchAdded(Dpid dpid) {
368 +
369 + OpenFlowSwitch sw = controller.getSwitch(dpid);
370 +
295 createCollector(controller.getSwitch(dpid)); 371 createCollector(controller.getSwitch(dpid));
296 } 372 }
297 373
298 @Override 374 @Override
299 public void switchRemoved(Dpid dpid) { 375 public void switchRemoved(Dpid dpid) {
300 - FlowStatsCollector collector = collectors.remove(dpid); 376 + if (adaptiveFlowSampling) {
377 + NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid);
378 + if (collector != null) {
379 + collector.stop();
380 + }
381 + } else {
382 + FlowStatsCollector collector = simpleCollectors.remove(dpid);
301 if (collector != null) { 383 if (collector != null) {
302 collector.stop(); 384 collector.stop();
303 } 385 }
304 } 386 }
387 + }
305 388
306 @Override 389 @Override
307 public void switchChanged(Dpid dpid) { 390 public void switchChanged(Dpid dpid) {
...@@ -321,6 +404,11 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -321,6 +404,11 @@ public class OpenFlowRuleProvider extends AbstractProvider
321 404
322 FlowEntry fr = new FlowEntryBuilder(dpid, removed).build(); 405 FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
323 providerService.flowRemoved(fr); 406 providerService.flowRemoved(fr);
407 +
408 + if (adaptiveFlowSampling) {
409 + // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
410 + afsCollectors.get(dpid).flowRemoved(fr);
411 + }
324 break; 412 break;
325 case STATS_REPLY: 413 case STATS_REPLY:
326 if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) { 414 if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
...@@ -370,11 +458,10 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -370,11 +458,10 @@ public class OpenFlowRuleProvider extends AbstractProvider
370 + " tell us which one."); 458 + " tell us which one.");
371 } 459 }
372 } 460 }
373 - break; 461 +
374 default: 462 default:
375 log.debug("Unhandled message type: {}", msg.getType()); 463 log.debug("Unhandled message type: {}", msg.getType());
376 } 464 }
377 -
378 } 465 }
379 466
380 @Override 467 @Override
...@@ -392,9 +479,40 @@ public class OpenFlowRuleProvider extends AbstractProvider ...@@ -392,9 +479,40 @@ public class OpenFlowRuleProvider extends AbstractProvider
392 .map(entry -> new FlowEntryBuilder(dpid, entry).build()) 479 .map(entry -> new FlowEntryBuilder(dpid, entry).build())
393 .collect(Collectors.toList()); 480 .collect(Collectors.toList());
394 481
482 + if (adaptiveFlowSampling) {
483 + NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
484 +
485 + synchronized (afsc) {
486 + if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
487 + log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, "
488 + + "OFFlowStatsReply Xid={}, for {}",
489 + afsc.getFlowMissingXid(), replies.getXid(), dpid);
490 + }
491 +
492 + // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
493 + if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
494 + if (afsc.getFlowMissingXid() == replies.getXid()) {
495 + // call entire flow stats update with flowMissing synchronization.
496 + // used existing pushFlowMetrics
497 + providerService.pushFlowMetrics(did, flowEntries);
498 + }
499 + // reset flowMissingXid to NO_FLOW_MISSING_XID
500 + afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID);
501 +
502 + } else {
503 + // call individual flow stats update
504 + providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
505 + }
506 +
507 + // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
508 + afsc.pushFlowMetrics(flowEntries);
509 + }
510 + } else {
511 + // call existing entire flow stats update with flowMissing synchronization
395 providerService.pushFlowMetrics(did, flowEntries); 512 providerService.pushFlowMetrics(did, flowEntries);
396 } 513 }
397 } 514 }
515 + }
398 516
399 /** 517 /**
400 * The internal cache entry holding the original request as well as 518 * The internal cache entry holding the original request as well as
......