Committed by
Gerrit Code Review
OpenFlowRuleProvider is now configurable with respect to flowPollFrequency.
Change-Id: I3a559a9cd65df1ae56d80017696452788fc08d91
Showing
5 changed files
with
244 additions
and
181 deletions
| ... | @@ -23,8 +23,6 @@ import org.onosproject.net.provider.Provider; | ... | @@ -23,8 +23,6 @@ import org.onosproject.net.provider.Provider; |
| 23 | */ | 23 | */ |
| 24 | public interface FlowRuleProvider extends Provider { | 24 | public interface FlowRuleProvider extends Provider { |
| 25 | 25 | ||
| 26 | - static final int POLL_INTERVAL = 10; | ||
| 27 | - | ||
| 28 | /** | 26 | /** |
| 29 | * Instructs the provider to apply the specified flow rules to their | 27 | * Instructs the provider to apply the specified flow rules to their |
| 30 | * respective devices. | 28 | * respective devices. | ... | ... |
| ... | @@ -16,7 +16,6 @@ | ... | @@ -16,7 +16,6 @@ |
| 16 | package org.onosproject.net.statistic; | 16 | package org.onosproject.net.statistic; |
| 17 | 17 | ||
| 18 | import com.google.common.base.MoreObjects; | 18 | import com.google.common.base.MoreObjects; |
| 19 | -import org.onosproject.net.flow.FlowRuleProvider; | ||
| 20 | 19 | ||
| 21 | /** | 20 | /** |
| 22 | * Implementation of a load. | 21 | * Implementation of a load. |
| ... | @@ -29,6 +28,11 @@ public class DefaultLoad implements Load { | ... | @@ -29,6 +28,11 @@ public class DefaultLoad implements Load { |
| 29 | private final long time; | 28 | private final long time; |
| 30 | 29 | ||
| 31 | /** | 30 | /** |
| 31 | + * Indicates the flow statistics poll interval in seconds. | ||
| 32 | + */ | ||
| 33 | + private static int pollInterval = 10; | ||
| 34 | + | ||
| 35 | + /** | ||
| 32 | * Creates an invalid load. | 36 | * Creates an invalid load. |
| 33 | */ | 37 | */ |
| 34 | public DefaultLoad() { | 38 | public DefaultLoad() { |
| ... | @@ -50,9 +54,19 @@ public class DefaultLoad implements Load { | ... | @@ -50,9 +54,19 @@ public class DefaultLoad implements Load { |
| 50 | this.isValid = true; | 54 | this.isValid = true; |
| 51 | } | 55 | } |
| 52 | 56 | ||
| 57 | + /** | ||
| 58 | + * Sets the poll interval in seconds. Used solely for the purpose of | ||
| 59 | + * computing the load. | ||
| 60 | + * | ||
| 61 | + * @param newPollInterval poll interval duration in seconds | ||
| 62 | + */ | ||
| 63 | + public static void setPollInterval(int newPollInterval) { | ||
| 64 | + pollInterval = newPollInterval; | ||
| 65 | + } | ||
| 66 | + | ||
| 53 | @Override | 67 | @Override |
| 54 | public long rate() { | 68 | public long rate() { |
| 55 | - return (current - previous) / FlowRuleProvider.POLL_INTERVAL; | 69 | + return (current - previous) / pollInterval; |
| 56 | } | 70 | } |
| 57 | 71 | ||
| 58 | @Override | 72 | @Override | ... | ... |
| ... | @@ -31,4 +31,10 @@ | ... | @@ -31,4 +31,10 @@ |
| 31 | 31 | ||
| 32 | <description>ONOS OpenFlow protocol flow provider</description> | 32 | <description>ONOS OpenFlow protocol flow provider</description> |
| 33 | 33 | ||
| 34 | + <dependencies> | ||
| 35 | + <dependency> | ||
| 36 | + <groupId>org.osgi</groupId> | ||
| 37 | + <artifactId>org.osgi.compendium</artifactId> | ||
| 38 | + </dependency> | ||
| 39 | + </dependencies> | ||
| 34 | </project> | 40 | </project> | ... | ... |
| ... | @@ -15,87 +15,86 @@ | ... | @@ -15,87 +15,86 @@ |
| 15 | */ | 15 | */ |
| 16 | package org.onosproject.provider.of.flow.impl; | 16 | package org.onosproject.provider.of.flow.impl; |
| 17 | 17 | ||
| 18 | -import static org.slf4j.LoggerFactory.getLogger; | 18 | +import org.onlab.util.SharedExecutors; |
| 19 | - | ||
| 20 | -import java.util.concurrent.TimeUnit; | ||
| 21 | - | ||
| 22 | -import org.jboss.netty.util.HashedWheelTimer; | ||
| 23 | -import org.jboss.netty.util.Timeout; | ||
| 24 | -import org.jboss.netty.util.TimerTask; | ||
| 25 | import org.onosproject.openflow.controller.OpenFlowSwitch; | 19 | import org.onosproject.openflow.controller.OpenFlowSwitch; |
| 26 | import org.onosproject.openflow.controller.RoleState; | 20 | import org.onosproject.openflow.controller.RoleState; |
| 27 | -import org.onlab.util.Timer; | ||
| 28 | import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest; | 21 | import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest; |
| 29 | import org.projectfloodlight.openflow.types.OFPort; | 22 | import org.projectfloodlight.openflow.types.OFPort; |
| 30 | import org.projectfloodlight.openflow.types.TableId; | 23 | import org.projectfloodlight.openflow.types.TableId; |
| 31 | import org.slf4j.Logger; | 24 | import org.slf4j.Logger; |
| 32 | 25 | ||
| 33 | -public class FlowStatsCollector implements TimerTask { | 26 | +import java.util.Timer; |
| 27 | +import java.util.TimerTask; | ||
| 34 | 28 | ||
| 35 | - private final Logger log = getLogger(getClass()); | 29 | +import static org.slf4j.LoggerFactory.getLogger; |
| 36 | 30 | ||
| 37 | - private final HashedWheelTimer timer = Timer.getTimer(); | 31 | +/** |
| 38 | - private final OpenFlowSwitch sw; | 32 | + * Collects flow statistics for the specified switch. |
| 39 | - private final int refreshInterval; | 33 | + */ |
| 34 | +class FlowStatsCollector { | ||
| 40 | 35 | ||
| 41 | - private Timeout timeout; | 36 | + private final Logger log = getLogger(getClass()); |
| 42 | 37 | ||
| 43 | - private boolean stopTimer = false;; | 38 | + public static final int SECONDS = 1000; |
| 44 | 39 | ||
| 45 | - public FlowStatsCollector(OpenFlowSwitch sw, int refreshInterval) { | 40 | + private final OpenFlowSwitch sw; |
| 41 | + private Timer timer; | ||
| 42 | + private TimerTask task; | ||
| 43 | + | ||
| 44 | + private int pollInterval; | ||
| 45 | + | ||
| 46 | + /** | ||
| 47 | + * Creates a new collector for the given switch and poll frequency. | ||
| 48 | + * | ||
| 49 | + * @param timer timer to use for scheduling | ||
| 50 | + * @param sw switch to pull | ||
| 51 | + * @param pollInterval poll frequency in seconds | ||
| 52 | + */ | ||
| 53 | + FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) { | ||
| 54 | + this.timer = timer; | ||
| 46 | this.sw = sw; | 55 | this.sw = sw; |
| 47 | - this.refreshInterval = refreshInterval; | 56 | + this.pollInterval = pollInterval; |
| 48 | } | 57 | } |
| 49 | 58 | ||
| 50 | - @Override | 59 | + /** |
| 51 | - public void run(Timeout timeout) throws Exception { | 60 | + * Adjusts poll frequency. |
| 52 | - log.trace("Collecting stats for {}", this.sw.getStringId()); | 61 | + * |
| 53 | - | 62 | + * @param pollInterval poll frequency in seconds |
| 54 | - sendFlowStatistics(); | 63 | + */ |
| 55 | - | 64 | + synchronized void adjustPollInterval(int pollInterval) { |
| 56 | - if (!this.stopTimer) { | 65 | + this.pollInterval = pollInterval; |
| 57 | - log.trace("Scheduling stats collection in {} seconds for {}", | 66 | + task.cancel(); |
| 58 | - this.refreshInterval, this.sw.getStringId()); | 67 | + task = new InternalTimerTask(); |
| 59 | - timeout.getTimer().newTimeout(this, refreshInterval, | 68 | + timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000); |
| 60 | - TimeUnit.SECONDS); | ||
| 61 | - } | ||
| 62 | - | ||
| 63 | - | ||
| 64 | } | 69 | } |
| 65 | 70 | ||
| 66 | - private void sendFlowStatistics() { | 71 | + private class InternalTimerTask extends TimerTask { |
| 67 | - if (log.isTraceEnabled()) { | 72 | + @Override |
| 68 | - log.trace("sendFlowStatistics {}:{}", sw.getStringId(), sw.getRole()); | 73 | + public void run() { |
| 74 | + if (sw.getRole() == RoleState.MASTER) { | ||
| 75 | + log.trace("Collecting stats for {}", sw.getStringId()); | ||
| 76 | + OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() | ||
| 77 | + .setMatch(sw.factory().matchWildcardAll()) | ||
| 78 | + .setTableId(TableId.ALL) | ||
| 79 | + .setOutPort(OFPort.NO_MASK) | ||
| 80 | + .build(); | ||
| 81 | + sw.sendMsg(request); | ||
| 82 | + } | ||
| 69 | } | 83 | } |
| 70 | - if (sw.getRole() != RoleState.MASTER) { | ||
| 71 | - // Switch not master. | ||
| 72 | - return; | ||
| 73 | - } | ||
| 74 | - OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest() | ||
| 75 | - .setMatch(sw.factory().matchWildcardAll()) | ||
| 76 | - .setTableId(TableId.ALL) | ||
| 77 | - .setOutPort(OFPort.NO_MASK) | ||
| 78 | - .build(); | ||
| 79 | - | ||
| 80 | - this.sw.sendMsg(request); | ||
| 81 | - | ||
| 82 | } | 84 | } |
| 83 | 85 | ||
| 84 | - public void start() { | 86 | + public synchronized void start() { |
| 85 | - | 87 | + // Initially start polling quickly. Then drop down to configured value |
| 86 | - /* | 88 | + log.debug("Starting Stats collection thread for {}", sw.getStringId()); |
| 87 | - * Initially start polling quickly. Then drop down to configured value | 89 | + task = new InternalTimerTask(); |
| 88 | - */ | 90 | + SharedExecutors.getTimer().scheduleAtFixedRate(task, 1 * SECONDS, |
| 89 | - log.info("Starting Stats collection thread for {}", | 91 | + pollInterval * SECONDS); |
| 90 | - this.sw.getStringId()); | ||
| 91 | - timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS); | ||
| 92 | } | 92 | } |
| 93 | 93 | ||
| 94 | - public void stop() { | 94 | + public synchronized void stop() { |
| 95 | - log.info("Stopping Stats collection thread for {}", | 95 | + log.debug("Stopping Stats collection thread for {}", sw.getStringId()); |
| 96 | - this.sw.getStringId()); | 96 | + task.cancel(); |
| 97 | - this.stopTimer = true; | 97 | + task = null; |
| 98 | - timeout.cancel(); | ||
| 99 | } | 98 | } |
| 100 | 99 | ||
| 101 | } | 100 | } | ... | ... |
| ... | @@ -15,21 +15,20 @@ | ... | @@ -15,21 +15,20 @@ |
| 15 | */ | 15 | */ |
| 16 | package org.onosproject.provider.of.flow.impl; | 16 | package org.onosproject.provider.of.flow.impl; |
| 17 | 17 | ||
| 18 | -import static org.slf4j.LoggerFactory.getLogger; | 18 | +import com.google.common.cache.Cache; |
| 19 | - | 19 | +import com.google.common.cache.CacheBuilder; |
| 20 | -import java.util.Collections; | 20 | +import com.google.common.cache.RemovalCause; |
| 21 | -import java.util.List; | 21 | +import com.google.common.cache.RemovalNotification; |
| 22 | -import java.util.Map; | 22 | +import com.google.common.collect.Maps; |
| 23 | -import java.util.Optional; | 23 | +import com.google.common.collect.Sets; |
| 24 | -import java.util.Set; | ||
| 25 | -import java.util.concurrent.TimeUnit; | ||
| 26 | -import java.util.stream.Collectors; | ||
| 27 | - | ||
| 28 | import org.apache.felix.scr.annotations.Activate; | 24 | import org.apache.felix.scr.annotations.Activate; |
| 29 | import org.apache.felix.scr.annotations.Component; | 25 | import org.apache.felix.scr.annotations.Component; |
| 30 | import org.apache.felix.scr.annotations.Deactivate; | 26 | import org.apache.felix.scr.annotations.Deactivate; |
| 27 | +import org.apache.felix.scr.annotations.Modified; | ||
| 28 | +import org.apache.felix.scr.annotations.Property; | ||
| 31 | import org.apache.felix.scr.annotations.Reference; | 29 | import org.apache.felix.scr.annotations.Reference; |
| 32 | import org.apache.felix.scr.annotations.ReferenceCardinality; | 30 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
| 31 | +import org.onosproject.cfg.ComponentConfigService; | ||
| 33 | import org.onosproject.core.ApplicationId; | 32 | import org.onosproject.core.ApplicationId; |
| 34 | import org.onosproject.net.DeviceId; | 33 | import org.onosproject.net.DeviceId; |
| 35 | import org.onosproject.net.flow.CompletedBatchOperation; | 34 | import org.onosproject.net.flow.CompletedBatchOperation; |
| ... | @@ -43,6 +42,7 @@ import org.onosproject.net.flow.FlowRuleProviderRegistry; | ... | @@ -43,6 +42,7 @@ import org.onosproject.net.flow.FlowRuleProviderRegistry; |
| 43 | import org.onosproject.net.flow.FlowRuleProviderService; | 42 | import org.onosproject.net.flow.FlowRuleProviderService; |
| 44 | import org.onosproject.net.provider.AbstractProvider; | 43 | import org.onosproject.net.provider.AbstractProvider; |
| 45 | import org.onosproject.net.provider.ProviderId; | 44 | import org.onosproject.net.provider.ProviderId; |
| 45 | +import org.onosproject.net.statistic.DefaultLoad; | ||
| 46 | import org.onosproject.openflow.controller.Dpid; | 46 | import org.onosproject.openflow.controller.Dpid; |
| 47 | import org.onosproject.openflow.controller.OpenFlowController; | 47 | import org.onosproject.openflow.controller.OpenFlowController; |
| 48 | import org.onosproject.openflow.controller.OpenFlowEventListener; | 48 | import org.onosproject.openflow.controller.OpenFlowEventListener; |
| ... | @@ -50,6 +50,7 @@ import org.onosproject.openflow.controller.OpenFlowSwitch; | ... | @@ -50,6 +50,7 @@ import org.onosproject.openflow.controller.OpenFlowSwitch; |
| 50 | import org.onosproject.openflow.controller.OpenFlowSwitchListener; | 50 | import org.onosproject.openflow.controller.OpenFlowSwitchListener; |
| 51 | import org.onosproject.openflow.controller.RoleState; | 51 | import org.onosproject.openflow.controller.RoleState; |
| 52 | import org.onosproject.openflow.controller.ThirdPartyMessage; | 52 | import org.onosproject.openflow.controller.ThirdPartyMessage; |
| 53 | +import org.osgi.service.component.ComponentContext; | ||
| 53 | import org.projectfloodlight.openflow.protocol.OFBarrierRequest; | 54 | import org.projectfloodlight.openflow.protocol.OFBarrierRequest; |
| 54 | import org.projectfloodlight.openflow.protocol.OFErrorMsg; | 55 | import org.projectfloodlight.openflow.protocol.OFErrorMsg; |
| 55 | import org.projectfloodlight.openflow.protocol.OFErrorType; | 56 | import org.projectfloodlight.openflow.protocol.OFErrorType; |
| ... | @@ -63,12 +64,19 @@ import org.projectfloodlight.openflow.protocol.OFStatsType; | ... | @@ -63,12 +64,19 @@ import org.projectfloodlight.openflow.protocol.OFStatsType; |
| 63 | import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg; | 64 | import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg; |
| 64 | import org.slf4j.Logger; | 65 | import org.slf4j.Logger; |
| 65 | 66 | ||
| 66 | -import com.google.common.cache.Cache; | 67 | +import java.util.Collections; |
| 67 | -import com.google.common.cache.CacheBuilder; | 68 | +import java.util.Dictionary; |
| 68 | -import com.google.common.cache.RemovalCause; | 69 | +import java.util.List; |
| 69 | -import com.google.common.cache.RemovalNotification; | 70 | +import java.util.Map; |
| 70 | -import com.google.common.collect.Maps; | 71 | +import java.util.Optional; |
| 71 | -import com.google.common.collect.Sets; | 72 | +import java.util.Set; |
| 73 | +import java.util.Timer; | ||
| 74 | +import java.util.concurrent.TimeUnit; | ||
| 75 | +import java.util.stream.Collectors; | ||
| 76 | + | ||
| 77 | +import static com.google.common.base.Strings.isNullOrEmpty; | ||
| 78 | +import static org.onlab.util.Tools.get; | ||
| 79 | +import static org.slf4j.LoggerFactory.getLogger; | ||
| 72 | 80 | ||
| 73 | /** | 81 | /** |
| 74 | * Provider which uses an OpenFlow controller to detect network end-station | 82 | * Provider which uses an OpenFlow controller to detect network end-station |
| ... | @@ -86,12 +94,21 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -86,12 +94,21 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 86 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 94 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 87 | protected OpenFlowController controller; | 95 | protected OpenFlowController controller; |
| 88 | 96 | ||
| 97 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
| 98 | + protected ComponentConfigService cfgService; | ||
| 99 | + | ||
| 100 | + private static final int DEFAULT_POLL_FREQUENCY = 10; | ||
| 101 | + @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY, | ||
| 102 | + label = "Frequency (in seconds) for polling flow statistics") | ||
| 103 | + private int flowPollFrequency = DEFAULT_POLL_FREQUENCY; | ||
| 104 | + | ||
| 89 | private FlowRuleProviderService providerService; | 105 | private FlowRuleProviderService providerService; |
| 90 | 106 | ||
| 91 | private final InternalFlowProvider listener = new InternalFlowProvider(); | 107 | private final InternalFlowProvider listener = new InternalFlowProvider(); |
| 92 | 108 | ||
| 93 | private Cache<Long, InternalCacheEntry> pendingBatches; | 109 | private Cache<Long, InternalCacheEntry> pendingBatches; |
| 94 | 110 | ||
| 111 | + private final Timer timer = new Timer("onos-openflow-collector"); | ||
| 95 | private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap(); | 112 | private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap(); |
| 96 | 113 | ||
| 97 | /** | 114 | /** |
| ... | @@ -102,42 +119,79 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -102,42 +119,79 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 102 | } | 119 | } |
| 103 | 120 | ||
| 104 | @Activate | 121 | @Activate |
| 105 | - public void activate() { | 122 | + public void activate(ComponentContext context) { |
| 123 | + cfgService.registerProperties(getClass()); | ||
| 106 | providerService = providerRegistry.register(this); | 124 | providerService = providerRegistry.register(this); |
| 107 | controller.addListener(listener); | 125 | controller.addListener(listener); |
| 108 | controller.addEventListener(listener); | 126 | controller.addEventListener(listener); |
| 109 | 127 | ||
| 110 | - pendingBatches = CacheBuilder | 128 | + pendingBatches = createBatchCache(); |
| 111 | - .newBuilder() | 129 | + createCollectors(); |
| 112 | - .expireAfterWrite(10, TimeUnit.SECONDS) | ||
| 113 | - .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> { | ||
| 114 | - if (notification.getCause() == RemovalCause.EXPIRED) { | ||
| 115 | - providerService | ||
| 116 | - .batchOperationCompleted(notification | ||
| 117 | - .getKey(), | ||
| 118 | - notification | ||
| 119 | - .getValue() | ||
| 120 | - .failedCompletion()); | ||
| 121 | - } | ||
| 122 | - }).build(); | ||
| 123 | - | ||
| 124 | - for (OpenFlowSwitch sw : controller.getSwitches()) { | ||
| 125 | - FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL); | ||
| 126 | - fsc.start(); | ||
| 127 | - collectors.put(new Dpid(sw.getId()), fsc); | ||
| 128 | - } | ||
| 129 | 130 | ||
| 130 | log.info("Started"); | 131 | log.info("Started"); |
| 131 | } | 132 | } |
| 132 | 133 | ||
| 133 | @Deactivate | 134 | @Deactivate |
| 134 | - public void deactivate() { | 135 | + public void deactivate(ComponentContext context) { |
| 136 | + cfgService.unregisterProperties(getClass(), false); | ||
| 137 | + stopCollectors(); | ||
| 135 | providerRegistry.unregister(this); | 138 | providerRegistry.unregister(this); |
| 136 | providerService = null; | 139 | providerService = null; |
| 137 | 140 | ||
| 138 | log.info("Stopped"); | 141 | log.info("Stopped"); |
| 139 | } | 142 | } |
| 140 | 143 | ||
| 144 | + @Modified | ||
| 145 | + public void modified(ComponentContext context) { | ||
| 146 | + Dictionary<?, ?> properties = context.getProperties(); | ||
| 147 | + int newFlowPollFrequency; | ||
| 148 | + try { | ||
| 149 | + String s = get(properties, "flowPollFrequency"); | ||
| 150 | + newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim()); | ||
| 151 | + | ||
| 152 | + } catch (NumberFormatException | ClassCastException e) { | ||
| 153 | + newFlowPollFrequency = flowPollFrequency; | ||
| 154 | + } | ||
| 155 | + | ||
| 156 | + if (newFlowPollFrequency != flowPollFrequency) { | ||
| 157 | + flowPollFrequency = newFlowPollFrequency; | ||
| 158 | + adjustRate(); | ||
| 159 | + } | ||
| 160 | + | ||
| 161 | + log.info("Settings: flowPollFrequency={}", flowPollFrequency); | ||
| 162 | + } | ||
| 163 | + | ||
| 164 | + private Cache<Long, InternalCacheEntry> createBatchCache() { | ||
| 165 | + return CacheBuilder.newBuilder() | ||
| 166 | + .expireAfterWrite(10, TimeUnit.SECONDS) | ||
| 167 | + .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> { | ||
| 168 | + if (notification.getCause() == RemovalCause.EXPIRED) { | ||
| 169 | + providerService.batchOperationCompleted(notification.getKey(), | ||
| 170 | + notification.getValue().failedCompletion()); | ||
| 171 | + } | ||
| 172 | + }).build(); | ||
| 173 | + } | ||
| 174 | + | ||
| 175 | + private void createCollectors() { | ||
| 176 | + controller.getSwitches().forEach(this::createCollector); | ||
| 177 | + } | ||
| 178 | + | ||
| 179 | + private void createCollector(OpenFlowSwitch sw) { | ||
| 180 | + FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency); | ||
| 181 | + fsc.start(); | ||
| 182 | + collectors.put(new Dpid(sw.getId()), fsc); | ||
| 183 | + } | ||
| 184 | + | ||
| 185 | + private void stopCollectors() { | ||
| 186 | + collectors.values().forEach(FlowStatsCollector::stop); | ||
| 187 | + collectors.clear(); | ||
| 188 | + } | ||
| 189 | + | ||
| 190 | + private void adjustRate() { | ||
| 191 | + DefaultLoad.setPollInterval(flowPollFrequency); | ||
| 192 | + collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency)); | ||
| 193 | + } | ||
| 194 | + | ||
| 141 | @Override | 195 | @Override |
| 142 | public void applyFlowRule(FlowRule... flowRules) { | 196 | public void applyFlowRule(FlowRule... flowRules) { |
| 143 | for (FlowRule flowRule : flowRules) { | 197 | for (FlowRule flowRule : flowRules) { |
| ... | @@ -147,7 +201,7 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -147,7 +201,7 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 147 | 201 | ||
| 148 | private void applyRule(FlowRule flowRule) { | 202 | private void applyRule(FlowRule flowRule) { |
| 149 | OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() | 203 | OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() |
| 150 | - .uri())); | 204 | + .uri())); |
| 151 | FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); | 205 | FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); |
| 152 | if (hasPayload(flowRuleExtPayLoad)) { | 206 | if (hasPayload(flowRuleExtPayLoad)) { |
| 153 | OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); | 207 | OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); |
| ... | @@ -167,7 +221,7 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -167,7 +221,7 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 167 | 221 | ||
| 168 | private void removeRule(FlowRule flowRule) { | 222 | private void removeRule(FlowRule flowRule) { |
| 169 | OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() | 223 | OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId() |
| 170 | - .uri())); | 224 | + .uri())); |
| 171 | FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); | 225 | FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad(); |
| 172 | if (hasPayload(flowRuleExtPayLoad)) { | 226 | if (hasPayload(flowRuleExtPayLoad)) { |
| 173 | OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); | 227 | OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad()); |
| ... | @@ -190,7 +244,7 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -190,7 +244,7 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 190 | pendingBatches.put(batch.id(), new InternalCacheEntry(batch)); | 244 | pendingBatches.put(batch.id(), new InternalCacheEntry(batch)); |
| 191 | 245 | ||
| 192 | OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId() | 246 | OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId() |
| 193 | - .uri())); | 247 | + .uri())); |
| 194 | OFFlowMod mod; | 248 | OFFlowMod mod; |
| 195 | for (FlowRuleBatchEntry fbe : batch.getOperations()) { | 249 | for (FlowRuleBatchEntry fbe : batch.getOperations()) { |
| 196 | // flow is the third party privacy flow | 250 | // flow is the third party privacy flow |
| ... | @@ -204,19 +258,19 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -204,19 +258,19 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 204 | FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw | 258 | FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw |
| 205 | .factory(), Optional.of(batch.id())); | 259 | .factory(), Optional.of(batch.id())); |
| 206 | switch (fbe.operator()) { | 260 | switch (fbe.operator()) { |
| 207 | - case ADD: | 261 | + case ADD: |
| 208 | - mod = builder.buildFlowAdd(); | 262 | + mod = builder.buildFlowAdd(); |
| 209 | - break; | 263 | + break; |
| 210 | - case REMOVE: | 264 | + case REMOVE: |
| 211 | - mod = builder.buildFlowDel(); | 265 | + mod = builder.buildFlowDel(); |
| 212 | - break; | 266 | + break; |
| 213 | - case MODIFY: | 267 | + case MODIFY: |
| 214 | - mod = builder.buildFlowMod(); | 268 | + mod = builder.buildFlowMod(); |
| 215 | - break; | 269 | + break; |
| 216 | - default: | 270 | + default: |
| 217 | - log.error("Unsupported batch operation {}; skipping flowmod {}", | 271 | + log.error("Unsupported batch operation {}; skipping flowmod {}", |
| 218 | - fbe.operator(), fbe); | 272 | + fbe.operator(), fbe); |
| 219 | - continue; | 273 | + continue; |
| 220 | } | 274 | } |
| 221 | sw.sendMsg(mod); | 275 | sw.sendMsg(mod); |
| 222 | } | 276 | } |
| ... | @@ -236,12 +290,7 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -236,12 +290,7 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 236 | 290 | ||
| 237 | @Override | 291 | @Override |
| 238 | public void switchAdded(Dpid dpid) { | 292 | public void switchAdded(Dpid dpid) { |
| 239 | - FlowStatsCollector fsc = new FlowStatsCollector( | 293 | + createCollector(controller.getSwitch(dpid)); |
| 240 | - controller | ||
| 241 | - .getSwitch(dpid), | ||
| 242 | - POLL_INTERVAL); | ||
| 243 | - fsc.start(); | ||
| 244 | - collectors.put(dpid, fsc); | ||
| 245 | } | 294 | } |
| 246 | 295 | ||
| 247 | @Override | 296 | @Override |
| ... | @@ -265,64 +314,64 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -265,64 +314,64 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 265 | public void handleMessage(Dpid dpid, OFMessage msg) { | 314 | public void handleMessage(Dpid dpid, OFMessage msg) { |
| 266 | OpenFlowSwitch sw = controller.getSwitch(dpid); | 315 | OpenFlowSwitch sw = controller.getSwitch(dpid); |
| 267 | switch (msg.getType()) { | 316 | switch (msg.getType()) { |
| 268 | - case FLOW_REMOVED: | 317 | + case FLOW_REMOVED: |
| 269 | - OFFlowRemoved removed = (OFFlowRemoved) msg; | 318 | + OFFlowRemoved removed = (OFFlowRemoved) msg; |
| 270 | - | 319 | + |
| 271 | - FlowEntry fr = new FlowEntryBuilder(dpid, removed).build(); | 320 | + FlowEntry fr = new FlowEntryBuilder(dpid, removed).build(); |
| 272 | - providerService.flowRemoved(fr); | 321 | + providerService.flowRemoved(fr); |
| 273 | - break; | 322 | + break; |
| 274 | - case STATS_REPLY: | 323 | + case STATS_REPLY: |
| 275 | - if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) { | 324 | + if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) { |
| 276 | - pushFlowMetrics(dpid, (OFFlowStatsReply) msg); | 325 | + pushFlowMetrics(dpid, (OFFlowStatsReply) msg); |
| 277 | - } | ||
| 278 | - break; | ||
| 279 | - case BARRIER_REPLY: | ||
| 280 | - try { | ||
| 281 | - InternalCacheEntry entry = pendingBatches.getIfPresent(msg | ||
| 282 | - .getXid()); | ||
| 283 | - if (entry != null) { | ||
| 284 | - providerService | ||
| 285 | - .batchOperationCompleted(msg.getXid(), | ||
| 286 | - entry.completed()); | ||
| 287 | - } else { | ||
| 288 | - log.warn("Received unknown Barrier Reply: {}", | ||
| 289 | - msg.getXid()); | ||
| 290 | } | 326 | } |
| 291 | - } finally { | 327 | + break; |
| 292 | - pendingBatches.invalidate(msg.getXid()); | 328 | + case BARRIER_REPLY: |
| 293 | - } | 329 | + try { |
| 294 | - break; | 330 | + InternalCacheEntry entry = pendingBatches.getIfPresent(msg |
| 295 | - case ERROR: | 331 | + .getXid()); |
| 296 | - log.warn("received Error message {} from {}", msg, dpid); | ||
| 297 | - | ||
| 298 | - OFErrorMsg error = (OFErrorMsg) msg; | ||
| 299 | - if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) { | ||
| 300 | - OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error; | ||
| 301 | - if (fmFailed.getData().getParsedMessage().isPresent()) { | ||
| 302 | - OFMessage m = fmFailed.getData().getParsedMessage() | ||
| 303 | - .get(); | ||
| 304 | - OFFlowMod fm = (OFFlowMod) m; | ||
| 305 | - InternalCacheEntry entry = pendingBatches | ||
| 306 | - .getIfPresent(msg.getXid()); | ||
| 307 | if (entry != null) { | 332 | if (entry != null) { |
| 308 | - entry.appendFailure(new FlowEntryBuilder(dpid, fm) | 333 | + providerService |
| 309 | - .build()); | 334 | + .batchOperationCompleted(msg.getXid(), |
| 335 | + entry.completed()); | ||
| 310 | } else { | 336 | } else { |
| 311 | - log.error("No matching batch for this error: {}", | 337 | + log.warn("Received unknown Barrier Reply: {}", |
| 312 | - error); | 338 | + msg.getXid()); |
| 339 | + } | ||
| 340 | + } finally { | ||
| 341 | + pendingBatches.invalidate(msg.getXid()); | ||
| 342 | + } | ||
| 343 | + break; | ||
| 344 | + case ERROR: | ||
| 345 | + log.warn("received Error message {} from {}", msg, dpid); | ||
| 346 | + | ||
| 347 | + OFErrorMsg error = (OFErrorMsg) msg; | ||
| 348 | + if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) { | ||
| 349 | + OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error; | ||
| 350 | + if (fmFailed.getData().getParsedMessage().isPresent()) { | ||
| 351 | + OFMessage m = fmFailed.getData().getParsedMessage() | ||
| 352 | + .get(); | ||
| 353 | + OFFlowMod fm = (OFFlowMod) m; | ||
| 354 | + InternalCacheEntry entry = pendingBatches | ||
| 355 | + .getIfPresent(msg.getXid()); | ||
| 356 | + if (entry != null) { | ||
| 357 | + entry.appendFailure(new FlowEntryBuilder(dpid, fm) | ||
| 358 | + .build()); | ||
| 359 | + } else { | ||
| 360 | + log.error("No matching batch for this error: {}", | ||
| 361 | + error); | ||
| 362 | + } | ||
| 363 | + } else { | ||
| 364 | + // FIXME: Potentially add flowtracking to avoid this | ||
| 365 | + // message. | ||
| 366 | + log.error("Flow installation failed but switch didn't" | ||
| 367 | + + " tell us which one."); | ||
| 313 | } | 368 | } |
| 314 | } else { | 369 | } else { |
| 315 | - // FIXME: Potentially add flowtracking to avoid this | 370 | + log.warn("Received error {}", error); |
| 316 | - // message. | ||
| 317 | - log.error("Flow installation failed but switch didn't" | ||
| 318 | - + " tell us which one."); | ||
| 319 | } | 371 | } |
| 320 | - } else { | ||
| 321 | - log.warn("Received error {}", error); | ||
| 322 | - } | ||
| 323 | 372 | ||
| 324 | - default: | 373 | + default: |
| 325 | - log.debug("Unhandled message type: {}", msg.getType()); | 374 | + log.debug("Unhandled message type: {}", msg.getType()); |
| 326 | } | 375 | } |
| 327 | 376 | ||
| 328 | } | 377 | } |
| ... | @@ -343,15 +392,13 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -343,15 +392,13 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 343 | .collect(Collectors.toList()); | 392 | .collect(Collectors.toList()); |
| 344 | 393 | ||
| 345 | providerService.pushFlowMetrics(did, flowEntries); | 394 | providerService.pushFlowMetrics(did, flowEntries); |
| 346 | - | ||
| 347 | } | 395 | } |
| 348 | - | ||
| 349 | } | 396 | } |
| 350 | 397 | ||
| 351 | /** | 398 | /** |
| 352 | * The internal cache entry holding the original request as well as | 399 | * The internal cache entry holding the original request as well as |
| 353 | * accumulating the any failures along the way. | 400 | * accumulating the any failures along the way. |
| 354 | - * | 401 | + * <p/> |
| 355 | * If this entry is evicted from the cache then the entire operation is | 402 | * If this entry is evicted from the cache then the entire operation is |
| 356 | * considered failed. Otherwise, only the failures reported by the device | 403 | * considered failed. Otherwise, only the failures reported by the device |
| 357 | * will be propagated up. | 404 | * will be propagated up. |
| ... | @@ -395,12 +442,11 @@ public class OpenFlowRuleProvider extends AbstractProvider | ... | @@ -395,12 +442,11 @@ public class OpenFlowRuleProvider extends AbstractProvider |
| 395 | */ | 442 | */ |
| 396 | public CompletedBatchOperation completed() { | 443 | public CompletedBatchOperation completed() { |
| 397 | return new CompletedBatchOperation( | 444 | return new CompletedBatchOperation( |
| 398 | - failures.isEmpty(), | 445 | + failures.isEmpty(), |
| 399 | - Collections | 446 | + Collections |
| 400 | - .unmodifiableSet(failures), | 447 | + .unmodifiableSet(failures), |
| 401 | - operation.deviceId()); | 448 | + operation.deviceId()); |
| 402 | } | 449 | } |
| 403 | - | ||
| 404 | } | 450 | } |
| 405 | 451 | ||
| 406 | } | 452 | } | ... | ... |
-
Please register or login to post a comment