alshabib
Committed by Gerrit Code Review

aggregate flow replies on io thread

Change-Id: I622290f213ee830cfab7e4bd4ad7a52f612b475e
...@@ -355,7 +355,7 @@ public class FlowRuleManager ...@@ -355,7 +355,7 @@ public class FlowRuleManager
355 355
356 @Override 356 @Override
357 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) { 357 public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
358 - List<FlowEntry> storedRules = Lists.newLinkedList(store.getFlowEntries(deviceId)); 358 + Set<FlowEntry> storedRules = Sets.newHashSet(store.getFlowEntries(deviceId));
359 359
360 for (FlowEntry rule : flowEntries) { 360 for (FlowEntry rule : flowEntries) {
361 if (storedRules.remove(rule)) { 361 if (storedRules.remove(rule)) {
......
...@@ -17,6 +17,7 @@ package org.onosproject.openflow.controller.impl; ...@@ -17,6 +17,7 @@ package org.onosproject.openflow.controller.impl;
17 17
18 import static org.onlab.util.Tools.namedThreads; 18 import static org.onlab.util.Tools.namedThreads;
19 19
20 +import java.util.Collection;
20 import java.util.HashSet; 21 import java.util.HashSet;
21 import java.util.Set; 22 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap; 23 import java.util.concurrent.ConcurrentHashMap;
...@@ -25,6 +26,7 @@ import java.util.concurrent.Executors; ...@@ -25,6 +26,7 @@ import java.util.concurrent.Executors;
25 import java.util.concurrent.locks.Lock; 26 import java.util.concurrent.locks.Lock;
26 import java.util.concurrent.locks.ReentrantLock; 27 import java.util.concurrent.locks.ReentrantLock;
27 28
29 +import com.google.common.collect.Lists;
28 import org.apache.felix.scr.annotations.Activate; 30 import org.apache.felix.scr.annotations.Activate;
29 import org.apache.felix.scr.annotations.Component; 31 import org.apache.felix.scr.annotations.Component;
30 import org.apache.felix.scr.annotations.Deactivate; 32 import org.apache.felix.scr.annotations.Deactivate;
...@@ -41,11 +43,15 @@ import org.onosproject.openflow.controller.RoleState; ...@@ -41,11 +43,15 @@ import org.onosproject.openflow.controller.RoleState;
41 import org.onosproject.openflow.controller.driver.OpenFlowAgent; 43 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
42 import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus; 44 import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus;
43 import org.projectfloodlight.openflow.protocol.OFExperimenter; 45 import org.projectfloodlight.openflow.protocol.OFExperimenter;
46 +import org.projectfloodlight.openflow.protocol.OFFactories;
47 +import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
48 +import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
44 import org.projectfloodlight.openflow.protocol.OFMessage; 49 import org.projectfloodlight.openflow.protocol.OFMessage;
45 import org.projectfloodlight.openflow.protocol.OFPacketIn; 50 import org.projectfloodlight.openflow.protocol.OFPacketIn;
46 import org.projectfloodlight.openflow.protocol.OFPortDesc; 51 import org.projectfloodlight.openflow.protocol.OFPortDesc;
47 import org.projectfloodlight.openflow.protocol.OFPortStatus; 52 import org.projectfloodlight.openflow.protocol.OFPortStatus;
48 import org.projectfloodlight.openflow.protocol.OFStatsReply; 53 import org.projectfloodlight.openflow.protocol.OFStatsReply;
54 +import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
49 import org.projectfloodlight.openflow.protocol.OFStatsType; 55 import org.projectfloodlight.openflow.protocol.OFStatsType;
50 import org.slf4j.Logger; 56 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory; 57 import org.slf4j.LoggerFactory;
...@@ -79,6 +85,9 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -79,6 +85,9 @@ public class OpenFlowControllerImpl implements OpenFlowController {
79 85
80 protected Set<OpenFlowEventListener> ofEventListener = Sets.newHashSet(); 86 protected Set<OpenFlowEventListener> ofEventListener = Sets.newHashSet();
81 87
88 + protected Multimap<Dpid, OFFlowStatsEntry> fullStats =
89 + ArrayListMultimap.create();
90 +
82 private final Controller ctrl = new Controller(); 91 private final Controller ctrl = new Controller();
83 92
84 @Activate 93 @Activate
...@@ -160,6 +169,7 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -160,6 +169,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
160 169
161 @Override 170 @Override
162 public void processPacket(Dpid dpid, OFMessage msg) { 171 public void processPacket(Dpid dpid, OFMessage msg) {
172 + Collection<OFFlowStatsEntry> stats;
163 switch (msg.getType()) { 173 switch (msg.getType()) {
164 case PORT_STATUS: 174 case PORT_STATUS:
165 for (OpenFlowSwitchListener l : ofSwitchListener) { 175 for (OpenFlowSwitchListener l : ofSwitchListener) {
...@@ -186,7 +196,15 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -186,7 +196,15 @@ public class OpenFlowControllerImpl implements OpenFlowController {
186 l.switchChanged(dpid); 196 l.switchChanged(dpid);
187 } 197 }
188 } 198 }
189 - // fall through to invoke handler 199 + stats = publishStats(dpid, reply);
200 + if (stats != null) {
201 + OFFlowStatsReply.Builder rep =
202 + OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
203 + rep.setEntries(Lists.newLinkedList(stats));
204 + executor.submit(new OFMessageHandler(dpid, rep.build()));
205 + }
206 + break;
207 +
190 case FLOW_REMOVED: 208 case FLOW_REMOVED:
191 case ERROR: 209 case ERROR:
192 case BARRIER_REPLY: 210 case BARRIER_REPLY:
...@@ -218,6 +236,20 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -218,6 +236,20 @@ public class OpenFlowControllerImpl implements OpenFlowController {
218 } 236 }
219 } 237 }
220 238
239 + private synchronized Collection<OFFlowStatsEntry> publishStats(Dpid dpid,
240 + OFStatsReply reply) {
241 + //TODO: Get rid of synchronized
242 + if (reply.getStatsType() != OFStatsType.FLOW) {
243 + return null;
244 + }
245 + final OFFlowStatsReply replies = (OFFlowStatsReply) reply;
246 + fullStats.putAll(dpid, replies.getEntries());
247 + if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
248 + return fullStats.removeAll(dpid);
249 + }
250 + return null;
251 + }
252 +
221 @Override 253 @Override
222 public void setRole(Dpid dpid, RoleState role) { 254 public void setRole(Dpid dpid, RoleState role) {
223 final OpenFlowSwitch sw = getSwitch(dpid); 255 final OpenFlowSwitch sw = getSwitch(dpid);
......
...@@ -58,8 +58,6 @@ import org.projectfloodlight.openflow.protocol.OFInstructionType; ...@@ -58,8 +58,6 @@ import org.projectfloodlight.openflow.protocol.OFInstructionType;
58 import org.projectfloodlight.openflow.protocol.OFMessage; 58 import org.projectfloodlight.openflow.protocol.OFMessage;
59 import org.projectfloodlight.openflow.protocol.OFPortStatus; 59 import org.projectfloodlight.openflow.protocol.OFPortStatus;
60 import org.projectfloodlight.openflow.protocol.OFStatsReply; 60 import org.projectfloodlight.openflow.protocol.OFStatsReply;
61 -import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
62 -import org.projectfloodlight.openflow.protocol.OFStatsType;
63 import org.projectfloodlight.openflow.protocol.OFVersion; 61 import org.projectfloodlight.openflow.protocol.OFVersion;
64 import org.projectfloodlight.openflow.protocol.action.OFAction; 62 import org.projectfloodlight.openflow.protocol.action.OFAction;
65 import org.projectfloodlight.openflow.protocol.action.OFActionOutput; 63 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
...@@ -332,24 +330,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -332,24 +330,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
332 } 330 }
333 331
334 private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) { 332 private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
335 - if (stats.getStatsType() != OFStatsType.FLOW) { 333 +
336 - return;
337 - }
338 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid)); 334 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
339 final OFFlowStatsReply replies = (OFFlowStatsReply) stats; 335 final OFFlowStatsReply replies = (OFFlowStatsReply) stats;
340 - //final List<FlowRule> entries = Lists.newLinkedList();
341 336
342 - for (OFFlowStatsEntry reply : replies.getEntries()) { 337 + List<FlowEntry> flowEntries = replies.getEntries().stream()
343 - if (!tableMissRule(dpid, reply)) { 338 + .filter(entry -> !tableMissRule(dpid, entry))
344 - completeEntries.put(did, new FlowEntryBuilder(dpid, reply).build()); 339 + .map(entry -> new FlowEntryBuilder(dpid, entry).build())
345 - } 340 + .collect(Collectors.toList());
346 - } 341 +
342 + providerService.pushFlowMetrics(did, flowEntries);
347 343
348 - if (!stats.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
349 - log.trace("sending flowstats to core {}", completeEntries.get(did));
350 - providerService.pushFlowMetrics(did, completeEntries.get(did));
351 - completeEntries.removeAll(did);
352 - }
353 } 344 }
354 345
355 private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) { 346 private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {
......