Committed by
Gerrit Code Review
ONOS-3023 Changing flowTable sets to map so that we can compare
stored vs. new rule when adding and removing Change-Id: Ibd885023d550af3b2220056fbdf44ad8ec7fefda
Showing
6 changed files
with
154 additions
and
123 deletions
... | @@ -15,6 +15,7 @@ | ... | @@ -15,6 +15,7 @@ |
15 | */ | 15 | */ |
16 | package org.onosproject.net.flow; | 16 | package org.onosproject.net.flow; |
17 | 17 | ||
18 | +import com.google.common.annotations.Beta; | ||
18 | import org.onosproject.core.ApplicationId; | 19 | import org.onosproject.core.ApplicationId; |
19 | import org.onosproject.core.DefaultGroupId; | 20 | import org.onosproject.core.DefaultGroupId; |
20 | import org.onosproject.core.GroupId; | 21 | import org.onosproject.core.GroupId; |
... | @@ -284,6 +285,11 @@ public class DefaultFlowRule implements FlowRule { | ... | @@ -284,6 +285,11 @@ public class DefaultFlowRule implements FlowRule { |
284 | return tableId; | 285 | return tableId; |
285 | } | 286 | } |
286 | 287 | ||
288 | + @Beta | ||
289 | + public long created() { | ||
290 | + return created; | ||
291 | + } | ||
292 | + | ||
287 | public static Builder builder() { | 293 | public static Builder builder() { |
288 | return new Builder(); | 294 | return new Builder(); |
289 | } | 295 | } | ... | ... |
... | @@ -220,6 +220,7 @@ public class SimpleFlowRuleStore | ... | @@ -220,6 +220,7 @@ public class SimpleFlowRuleStore |
220 | for (StoredFlowEntry stored : entries) { | 220 | for (StoredFlowEntry stored : entries) { |
221 | if (stored.equals(rule)) { | 221 | if (stored.equals(rule)) { |
222 | synchronized (stored) { | 222 | synchronized (stored) { |
223 | + //FIXME modification of "stored" flow entry outside of flow table | ||
223 | stored.setBytes(rule.bytes()); | 224 | stored.setBytes(rule.bytes()); |
224 | stored.setLife(rule.life()); | 225 | stored.setLife(rule.life()); |
225 | stored.setPackets(rule.packets()); | 226 | stored.setPackets(rule.packets()); | ... | ... |
... | @@ -311,6 +311,7 @@ public class FlowRuleManager | ... | @@ -311,6 +311,7 @@ public class FlowRuleManager |
311 | } catch (UnsupportedOperationException e) { | 311 | } catch (UnsupportedOperationException e) { |
312 | log.warn(e.getMessage()); | 312 | log.warn(e.getMessage()); |
313 | if (flowRule instanceof DefaultFlowEntry) { | 313 | if (flowRule instanceof DefaultFlowEntry) { |
314 | + //FIXME modification of "stored" flow entry outside of store | ||
314 | ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED); | 315 | ((DefaultFlowEntry) flowRule).setState(FlowEntry.FlowEntryState.FAILED); |
315 | } | 316 | } |
316 | } | 317 | } |
... | @@ -323,10 +324,8 @@ public class FlowRuleManager | ... | @@ -323,10 +324,8 @@ public class FlowRuleManager |
323 | log.debug("Flow {} removed", flowRule); | 324 | log.debug("Flow {} removed", flowRule); |
324 | post(event); | 325 | post(event); |
325 | } | 326 | } |
326 | - | ||
327 | } | 327 | } |
328 | 328 | ||
329 | - | ||
330 | private void extraneousFlow(FlowRule flowRule) { | 329 | private void extraneousFlow(FlowRule flowRule) { |
331 | checkNotNull(flowRule, FLOW_RULE_NULL); | 330 | checkNotNull(flowRule, FLOW_RULE_NULL); |
332 | checkValidity(); | 331 | checkValidity(); |
... | @@ -335,13 +334,11 @@ public class FlowRuleManager | ... | @@ -335,13 +334,11 @@ public class FlowRuleManager |
335 | log.debug("Flow {} is on switch but not in store.", flowRule); | 334 | log.debug("Flow {} is on switch but not in store.", flowRule); |
336 | } | 335 | } |
337 | 336 | ||
338 | - | ||
339 | private void flowAdded(FlowEntry flowEntry) { | 337 | private void flowAdded(FlowEntry flowEntry) { |
340 | checkNotNull(flowEntry, FLOW_RULE_NULL); | 338 | checkNotNull(flowEntry, FLOW_RULE_NULL); |
341 | checkValidity(); | 339 | checkValidity(); |
342 | 340 | ||
343 | if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) { | 341 | if (checkRuleLiveness(flowEntry, store.getFlowEntry(flowEntry))) { |
344 | - | ||
345 | FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry); | 342 | FlowRuleEvent event = store.addOrUpdateFlowRule(flowEntry); |
346 | if (event == null) { | 343 | if (event == null) { |
347 | log.debug("No flow store event generated."); | 344 | log.debug("No flow store event generated."); |
... | @@ -353,7 +350,6 @@ public class FlowRuleManager | ... | @@ -353,7 +350,6 @@ public class FlowRuleManager |
353 | log.debug("Removing flow rules...."); | 350 | log.debug("Removing flow rules...."); |
354 | removeFlowRules(flowEntry); | 351 | removeFlowRules(flowEntry); |
355 | } | 352 | } |
356 | - | ||
357 | } | 353 | } |
358 | 354 | ||
359 | private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) { | 355 | private boolean checkRuleLiveness(FlowEntry swRule, FlowEntry storedRule) { | ... | ... |
... | @@ -269,23 +269,19 @@ public class FlowRuleManagerTest { | ... | @@ -269,23 +269,19 @@ public class FlowRuleManagerTest { |
269 | 269 | ||
270 | @Test | 270 | @Test |
271 | public void flowRemoved() { | 271 | public void flowRemoved() { |
272 | - | ||
273 | FlowRule f1 = addFlowRule(1); | 272 | FlowRule f1 = addFlowRule(1); |
274 | FlowRule f2 = addFlowRule(2); | 273 | FlowRule f2 = addFlowRule(2); |
275 | StoredFlowEntry fe1 = new DefaultFlowEntry(f1); | 274 | StoredFlowEntry fe1 = new DefaultFlowEntry(f1); |
276 | FlowEntry fe2 = new DefaultFlowEntry(f2); | 275 | FlowEntry fe2 = new DefaultFlowEntry(f2); |
277 | 276 | ||
278 | - | ||
279 | providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2)); | 277 | providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2)); |
280 | service.removeFlowRules(f1); | 278 | service.removeFlowRules(f1); |
281 | 279 | ||
280 | + //FIXME modification of "stored" flow entry outside of store | ||
282 | fe1.setState(FlowEntryState.REMOVED); | 281 | fe1.setState(FlowEntryState.REMOVED); |
283 | 282 | ||
284 | - | ||
285 | - | ||
286 | providerService.flowRemoved(fe1); | 283 | providerService.flowRemoved(fe1); |
287 | 284 | ||
288 | - | ||
289 | validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, | 285 | validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED, |
290 | RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED); | 286 | RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED); |
291 | 287 | ||
... | @@ -301,7 +297,6 @@ public class FlowRuleManagerTest { | ... | @@ -301,7 +297,6 @@ public class FlowRuleManagerTest { |
301 | 297 | ||
302 | providerService.flowRemoved(fe3); | 298 | providerService.flowRemoved(fe3); |
303 | validateEvents(); | 299 | validateEvents(); |
304 | - | ||
305 | } | 300 | } |
306 | 301 | ||
307 | @Test | 302 | @Test | ... | ... |
... | @@ -15,92 +15,92 @@ | ... | @@ -15,92 +15,92 @@ |
15 | */ | 15 | */ |
16 | package org.onosproject.store.flow.impl; | 16 | package org.onosproject.store.flow.impl; |
17 | 17 | ||
18 | -import com.google.common.base.Objects; | 18 | + import com.google.common.base.Objects; |
19 | -import com.google.common.collect.ImmutableList; | 19 | + import com.google.common.collect.ImmutableList; |
20 | -import com.google.common.collect.ImmutableMap; | 20 | + import com.google.common.collect.ImmutableMap; |
21 | -import com.google.common.collect.Iterables; | 21 | + import com.google.common.collect.Iterables; |
22 | -import com.google.common.collect.Maps; | 22 | + import com.google.common.collect.Maps; |
23 | -import com.google.common.collect.Sets; | 23 | + import com.google.common.collect.Sets; |
24 | -import com.google.common.util.concurrent.Futures; | 24 | + import com.google.common.util.concurrent.Futures; |
25 | - | 25 | + import org.apache.felix.scr.annotations.Activate; |
26 | -import org.apache.felix.scr.annotations.Activate; | 26 | + import org.apache.felix.scr.annotations.Component; |
27 | -import org.apache.felix.scr.annotations.Component; | 27 | + import org.apache.felix.scr.annotations.Deactivate; |
28 | -import org.apache.felix.scr.annotations.Deactivate; | 28 | + import org.apache.felix.scr.annotations.Modified; |
29 | -import org.apache.felix.scr.annotations.Modified; | 29 | + import org.apache.felix.scr.annotations.Property; |
30 | -import org.apache.felix.scr.annotations.Property; | 30 | + import org.apache.felix.scr.annotations.Reference; |
31 | -import org.apache.felix.scr.annotations.Reference; | 31 | + import org.apache.felix.scr.annotations.ReferenceCardinality; |
32 | -import org.apache.felix.scr.annotations.ReferenceCardinality; | 32 | + import org.apache.felix.scr.annotations.Service; |
33 | -import org.apache.felix.scr.annotations.Service; | 33 | + import org.onlab.util.KryoNamespace; |
34 | -import org.onlab.util.KryoNamespace; | 34 | + import org.onlab.util.Tools; |
35 | -import org.onlab.util.Tools; | 35 | + import org.onosproject.cfg.ComponentConfigService; |
36 | -import org.onosproject.cfg.ComponentConfigService; | 36 | + import org.onosproject.cluster.ClusterService; |
37 | -import org.onosproject.cluster.ClusterService; | 37 | + import org.onosproject.cluster.NodeId; |
38 | -import org.onosproject.cluster.NodeId; | 38 | + import org.onosproject.core.CoreService; |
39 | -import org.onosproject.core.CoreService; | 39 | + import org.onosproject.core.IdGenerator; |
40 | -import org.onosproject.core.IdGenerator; | 40 | + import org.onosproject.mastership.MastershipService; |
41 | -import org.onosproject.mastership.MastershipService; | 41 | + import org.onosproject.net.DeviceId; |
42 | -import org.onosproject.net.DeviceId; | 42 | + import org.onosproject.net.device.DeviceService; |
43 | -import org.onosproject.net.device.DeviceService; | 43 | + import org.onosproject.net.flow.CompletedBatchOperation; |
44 | -import org.onosproject.net.flow.CompletedBatchOperation; | 44 | + import org.onosproject.net.flow.DefaultFlowEntry; |
45 | -import org.onosproject.net.flow.DefaultFlowEntry; | 45 | + import org.onosproject.net.flow.FlowEntry; |
46 | -import org.onosproject.net.flow.FlowEntry; | 46 | + import org.onosproject.net.flow.FlowEntry.FlowEntryState; |
47 | -import org.onosproject.net.flow.FlowEntry.FlowEntryState; | 47 | + import org.onosproject.net.flow.FlowId; |
48 | -import org.onosproject.net.flow.FlowId; | 48 | + import org.onosproject.net.flow.FlowRule; |
49 | -import org.onosproject.net.flow.FlowRule; | 49 | + import org.onosproject.net.flow.FlowRuleBatchEntry; |
50 | -import org.onosproject.net.flow.FlowRuleBatchEntry; | 50 | + import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation; |
51 | -import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation; | 51 | + import org.onosproject.net.flow.FlowRuleBatchEvent; |
52 | -import org.onosproject.net.flow.FlowRuleBatchEvent; | 52 | + import org.onosproject.net.flow.FlowRuleBatchOperation; |
53 | -import org.onosproject.net.flow.FlowRuleBatchOperation; | 53 | + import org.onosproject.net.flow.FlowRuleBatchRequest; |
54 | -import org.onosproject.net.flow.FlowRuleBatchRequest; | 54 | + import org.onosproject.net.flow.FlowRuleEvent; |
55 | -import org.onosproject.net.flow.FlowRuleEvent; | 55 | + import org.onosproject.net.flow.FlowRuleEvent.Type; |
56 | -import org.onosproject.net.flow.FlowRuleEvent.Type; | 56 | + import org.onosproject.net.flow.FlowRuleService; |
57 | -import org.onosproject.net.flow.FlowRuleService; | 57 | + import org.onosproject.net.flow.FlowRuleStore; |
58 | -import org.onosproject.net.flow.FlowRuleStore; | 58 | + import org.onosproject.net.flow.FlowRuleStoreDelegate; |
59 | -import org.onosproject.net.flow.FlowRuleStoreDelegate; | 59 | + import org.onosproject.net.flow.StoredFlowEntry; |
60 | -import org.onosproject.net.flow.StoredFlowEntry; | 60 | + import org.onosproject.net.flow.TableStatisticsEntry; |
61 | -import org.onosproject.net.flow.TableStatisticsEntry; | 61 | + import org.onosproject.persistence.PersistenceService; |
62 | -import org.onosproject.persistence.PersistenceService; | 62 | + import org.onosproject.store.AbstractStore; |
63 | -import org.onosproject.store.AbstractStore; | 63 | + import org.onosproject.store.cluster.messaging.ClusterCommunicationService; |
64 | -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; | 64 | + import org.onosproject.store.cluster.messaging.ClusterMessage; |
65 | -import org.onosproject.store.cluster.messaging.ClusterMessage; | 65 | + import org.onosproject.store.cluster.messaging.ClusterMessageHandler; |
66 | -import org.onosproject.store.cluster.messaging.ClusterMessageHandler; | 66 | + import org.onosproject.store.flow.ReplicaInfoEvent; |
67 | -import org.onosproject.store.flow.ReplicaInfoEvent; | 67 | + import org.onosproject.store.flow.ReplicaInfoEventListener; |
68 | -import org.onosproject.store.flow.ReplicaInfoEventListener; | 68 | + import org.onosproject.store.flow.ReplicaInfoService; |
69 | -import org.onosproject.store.flow.ReplicaInfoService; | 69 | + import org.onosproject.store.impl.MastershipBasedTimestamp; |
70 | -import org.onosproject.store.impl.MastershipBasedTimestamp; | 70 | + import org.onosproject.store.serializers.KryoNamespaces; |
71 | -import org.onosproject.store.serializers.KryoNamespaces; | 71 | + import org.onosproject.store.serializers.KryoSerializer; |
72 | -import org.onosproject.store.serializers.KryoSerializer; | 72 | + import org.onosproject.store.serializers.StoreSerializer; |
73 | -import org.onosproject.store.serializers.StoreSerializer; | 73 | + import org.onosproject.store.serializers.custom.DistributedStoreSerializers; |
74 | -import org.onosproject.store.serializers.custom.DistributedStoreSerializers; | 74 | + import org.onosproject.store.service.EventuallyConsistentMap; |
75 | -import org.onosproject.store.service.EventuallyConsistentMap; | 75 | + import org.onosproject.store.service.EventuallyConsistentMapEvent; |
76 | -import org.onosproject.store.service.EventuallyConsistentMapEvent; | 76 | + import org.onosproject.store.service.EventuallyConsistentMapListener; |
77 | -import org.onosproject.store.service.EventuallyConsistentMapListener; | 77 | + import org.onosproject.store.service.Serializer; |
78 | -import org.onosproject.store.service.Serializer; | 78 | + import org.onosproject.store.service.StorageService; |
79 | -import org.onosproject.store.service.StorageService; | 79 | + import org.onosproject.store.service.WallClockTimestamp; |
80 | -import org.onosproject.store.service.WallClockTimestamp; | 80 | + import org.osgi.service.component.ComponentContext; |
81 | -import org.osgi.service.component.ComponentContext; | 81 | + import org.slf4j.Logger; |
82 | -import org.slf4j.Logger; | 82 | + |
83 | - | 83 | + import java.util.Collections; |
84 | -import java.util.Collections; | 84 | + import java.util.Dictionary; |
85 | -import java.util.Dictionary; | 85 | + import java.util.HashSet; |
86 | -import java.util.HashSet; | 86 | + import java.util.List; |
87 | -import java.util.List; | 87 | + import java.util.Map; |
88 | -import java.util.Map; | 88 | + import java.util.Set; |
89 | -import java.util.Set; | 89 | + import java.util.concurrent.ExecutorService; |
90 | -import java.util.concurrent.ExecutorService; | 90 | + import java.util.concurrent.Executors; |
91 | -import java.util.concurrent.Executors; | 91 | + import java.util.concurrent.ScheduledExecutorService; |
92 | -import java.util.concurrent.ScheduledExecutorService; | 92 | + import java.util.concurrent.ScheduledFuture; |
93 | -import java.util.concurrent.ScheduledFuture; | 93 | + import java.util.concurrent.TimeUnit; |
94 | -import java.util.concurrent.TimeUnit; | 94 | + import java.util.concurrent.atomic.AtomicInteger; |
95 | -import java.util.concurrent.atomic.AtomicInteger; | 95 | + import java.util.concurrent.atomic.AtomicReference; |
96 | -import java.util.stream.Collectors; | 96 | + import java.util.stream.Collectors; |
97 | - | 97 | + |
98 | -import static com.google.common.base.Strings.isNullOrEmpty; | 98 | + import static com.google.common.base.Strings.isNullOrEmpty; |
99 | -import static org.onlab.util.Tools.get; | 99 | + import static org.onlab.util.Tools.get; |
100 | -import static org.onlab.util.Tools.groupedThreads; | 100 | + import static org.onlab.util.Tools.groupedThreads; |
101 | -import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; | 101 | + import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED; |
102 | -import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*; | 102 | + import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*; |
103 | -import static org.slf4j.LoggerFactory.getLogger; | 103 | + import static org.slf4j.LoggerFactory.getLogger; |
104 | 104 | ||
105 | /** | 105 | /** |
106 | * Manages inventory of flow rules using a distributed state management protocol. | 106 | * Manages inventory of flow rules using a distributed state management protocol. |
... | @@ -498,7 +498,9 @@ public class NewDistributedFlowRuleStore | ... | @@ -498,7 +498,9 @@ public class NewDistributedFlowRuleStore |
498 | case REMOVE: | 498 | case REMOVE: |
499 | entry = flowTable.getFlowEntry(op.target()); | 499 | entry = flowTable.getFlowEntry(op.target()); |
500 | if (entry != null) { | 500 | if (entry != null) { |
501 | + //FIXME modification of "stored" flow entry outside of flow table | ||
501 | entry.setState(FlowEntryState.PENDING_REMOVE); | 502 | entry.setState(FlowEntryState.PENDING_REMOVE); |
503 | + log.debug("Setting state of rule to pending remove: {}", entry); | ||
502 | return op; | 504 | return op; |
503 | } | 505 | } |
504 | break; | 506 | break; |
... | @@ -539,6 +541,7 @@ public class NewDistributedFlowRuleStore | ... | @@ -539,6 +541,7 @@ public class NewDistributedFlowRuleStore |
539 | // check if this new rule is an update to an existing entry | 541 | // check if this new rule is an update to an existing entry |
540 | StoredFlowEntry stored = flowTable.getFlowEntry(rule); | 542 | StoredFlowEntry stored = flowTable.getFlowEntry(rule); |
541 | if (stored != null) { | 543 | if (stored != null) { |
544 | + //FIXME modification of "stored" flow entry outside of flow table | ||
542 | stored.setBytes(rule.bytes()); | 545 | stored.setBytes(rule.bytes()); |
543 | stored.setLife(rule.life()); | 546 | stored.setLife(rule.life()); |
544 | stored.setPackets(rule.packets()); | 547 | stored.setPackets(rule.packets()); |
... | @@ -588,8 +591,9 @@ public class NewDistributedFlowRuleStore | ... | @@ -588,8 +591,9 @@ public class NewDistributedFlowRuleStore |
588 | private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { | 591 | private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { |
589 | final DeviceId deviceId = rule.deviceId(); | 592 | final DeviceId deviceId = rule.deviceId(); |
590 | // This is where one could mark a rule as removed and still keep it in the store. | 593 | // This is where one could mark a rule as removed and still keep it in the store. |
591 | - final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule); | 594 | + final FlowEntry removed = flowTable.remove(deviceId, rule); |
592 | - return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null; | 595 | + // rule may be partial rule that is missing treatment, we should use rule from store instead |
596 | + return removed != null ? new FlowRuleEvent(RULE_REMOVED, removed) : null; | ||
593 | } | 597 | } |
594 | 598 | ||
595 | @Override | 599 | @Override |
... | @@ -635,7 +639,8 @@ public class NewDistributedFlowRuleStore | ... | @@ -635,7 +639,8 @@ public class NewDistributedFlowRuleStore |
635 | 639 | ||
636 | private class InternalFlowTable implements ReplicaInfoEventListener { | 640 | private class InternalFlowTable implements ReplicaInfoEventListener { |
637 | 641 | ||
638 | - private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> | 642 | + //TODO replace the Map<V,V> with ExtendedSet |
643 | + private final Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> | ||
639 | flowEntries = Maps.newConcurrentMap(); | 644 | flowEntries = Maps.newConcurrentMap(); |
640 | 645 | ||
641 | private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap(); | 646 | private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap(); |
... | @@ -692,11 +697,13 @@ public class NewDistributedFlowRuleStore | ... | @@ -692,11 +697,13 @@ public class NewDistributedFlowRuleStore |
692 | return; | 697 | return; |
693 | } | 698 | } |
694 | log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId); | 699 | log.debug("Sending flowEntries for devices {} to {} as backup.", deviceIds, nodeId); |
695 | - Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries = | 700 | + Map<DeviceId, Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> |
696 | - Maps.newConcurrentMap(); | 701 | + deviceFlowEntries = Maps.newConcurrentMap(); |
697 | deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id)))); | 702 | deviceIds.forEach(id -> deviceFlowEntries.put(id, ImmutableMap.copyOf(getFlowTable(id)))); |
698 | - clusterCommunicator.<Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>>, Set<DeviceId>>sendAndReceive( | 703 | + clusterCommunicator.<Map<DeviceId, |
699 | - deviceFlowEntries, | 704 | + Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>>, |
705 | + Set<DeviceId>> | ||
706 | + sendAndReceive(deviceFlowEntries, | ||
700 | FLOW_TABLE_BACKUP, | 707 | FLOW_TABLE_BACKUP, |
701 | SERIALIZER::encode, | 708 | SERIALIZER::encode, |
702 | SERIALIZER::decode, | 709 | SERIALIZER::decode, |
... | @@ -724,10 +731,10 @@ public class NewDistributedFlowRuleStore | ... | @@ -724,10 +731,10 @@ public class NewDistributedFlowRuleStore |
724 | * @param deviceId identifier of the device | 731 | * @param deviceId identifier of the device |
725 | * @return Map representing Flow Table of given device. | 732 | * @return Map representing Flow Table of given device. |
726 | */ | 733 | */ |
727 | - private Map<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) { | 734 | + private Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowTable(DeviceId deviceId) { |
728 | if (persistenceEnabled) { | 735 | if (persistenceEnabled) { |
729 | return flowEntries.computeIfAbsent(deviceId, id -> persistenceService | 736 | return flowEntries.computeIfAbsent(deviceId, id -> persistenceService |
730 | - .<FlowId, Set<StoredFlowEntry>>persistentMapBuilder() | 737 | + .<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>persistentMapBuilder() |
731 | .withName("FlowTable:" + deviceId.toString()) | 738 | .withName("FlowTable:" + deviceId.toString()) |
732 | .withSerializer(new Serializer() { | 739 | .withSerializer(new Serializer() { |
733 | @Override | 740 | @Override |
... | @@ -746,22 +753,18 @@ public class NewDistributedFlowRuleStore | ... | @@ -746,22 +753,18 @@ public class NewDistributedFlowRuleStore |
746 | } | 753 | } |
747 | } | 754 | } |
748 | 755 | ||
749 | - private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) { | 756 | + private Map<StoredFlowEntry, StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) { |
750 | - return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet()); | 757 | + return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Maps.newConcurrentMap()); |
751 | } | 758 | } |
752 | 759 | ||
753 | private StoredFlowEntry getFlowEntryInternal(FlowRule rule) { | 760 | private StoredFlowEntry getFlowEntryInternal(FlowRule rule) { |
754 | - Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id()); | 761 | + return getFlowEntriesInternal(rule.deviceId(), rule.id()).get(rule); |
755 | - return flowEntries.stream() | ||
756 | - .filter(entry -> Objects.equal(entry, rule)) | ||
757 | - .findAny() | ||
758 | - .orElse(null); | ||
759 | } | 762 | } |
760 | 763 | ||
761 | private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) { | 764 | private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) { |
762 | - Set<FlowEntry> result = Sets.newHashSet(); | 765 | + return getFlowTable(deviceId).values().stream() |
763 | - getFlowTable(deviceId).values().forEach(result::addAll); | 766 | + .flatMap(m -> m.values().stream()) |
764 | - return result; | 767 | + .collect(Collectors.toSet()); |
765 | } | 768 | } |
766 | 769 | ||
767 | public StoredFlowEntry getFlowEntry(FlowRule rule) { | 770 | public StoredFlowEntry getFlowEntry(FlowRule rule) { |
... | @@ -773,15 +776,40 @@ public class NewDistributedFlowRuleStore | ... | @@ -773,15 +776,40 @@ public class NewDistributedFlowRuleStore |
773 | } | 776 | } |
774 | 777 | ||
775 | public void add(FlowEntry rule) { | 778 | public void add(FlowEntry rule) { |
776 | - getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule); | 779 | + getFlowEntriesInternal(rule.deviceId(), rule.id()) |
780 | + .compute((StoredFlowEntry) rule, (k, stored) -> { | ||
781 | + //TODO compare stored and rule timestamps | ||
782 | + //TODO the key is not updated | ||
783 | + return (StoredFlowEntry) rule; | ||
784 | + }); | ||
777 | lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis()); | 785 | lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis()); |
778 | } | 786 | } |
779 | 787 | ||
780 | - public boolean remove(DeviceId deviceId, FlowEntry rule) { | 788 | + public FlowEntry remove(DeviceId deviceId, FlowEntry rule) { |
781 | - try { | 789 | + final AtomicReference<FlowEntry> removedRule = new AtomicReference<>(); |
782 | - return getFlowEntriesInternal(deviceId, rule.id()).remove(rule); | 790 | + getFlowEntriesInternal(rule.deviceId(), rule.id()) |
783 | - } finally { | 791 | + .computeIfPresent((StoredFlowEntry) rule, (k, stored) -> { |
792 | + if (rule instanceof DefaultFlowEntry) { | ||
793 | + DefaultFlowEntry toRemove = (DefaultFlowEntry) rule; | ||
794 | + if (stored instanceof DefaultFlowEntry) { | ||
795 | + DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored; | ||
796 | + if (toRemove.created() < storedEntry.created()) { | ||
797 | + log.debug("Trying to remove more recent flow entry {} (stored: {})", | ||
798 | + toRemove, stored); | ||
799 | + // the key is not updated, removedRule remains null | ||
800 | + return stored; | ||
801 | + } | ||
802 | + } | ||
803 | + } | ||
804 | + removedRule.set(stored); | ||
805 | + return null; | ||
806 | + }); | ||
807 | + | ||
808 | + if (removedRule.get() != null) { | ||
784 | lastUpdateTimes.put(deviceId, System.currentTimeMillis()); | 809 | lastUpdateTimes.put(deviceId, System.currentTimeMillis()); |
810 | + return removedRule.get(); | ||
811 | + } else { | ||
812 | + return null; | ||
785 | } | 813 | } |
786 | } | 814 | } |
787 | 815 | ||
... | @@ -826,14 +854,16 @@ public class NewDistributedFlowRuleStore | ... | @@ -826,14 +854,16 @@ public class NewDistributedFlowRuleStore |
826 | } | 854 | } |
827 | } | 855 | } |
828 | 856 | ||
829 | - private Set<DeviceId> onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) { | 857 | + private Set<DeviceId> onBackupReceipt(Map<DeviceId, |
858 | + Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>>> flowTables) { | ||
830 | log.debug("Received flowEntries for {} to backup", flowTables.keySet()); | 859 | log.debug("Received flowEntries for {} to backup", flowTables.keySet()); |
831 | Set<DeviceId> backedupDevices = Sets.newHashSet(); | 860 | Set<DeviceId> backedupDevices = Sets.newHashSet(); |
832 | try { | 861 | try { |
833 | flowTables.forEach((deviceId, deviceFlowTable) -> { | 862 | flowTables.forEach((deviceId, deviceFlowTable) -> { |
834 | // Only process those devices are that not managed by the local node. | 863 | // Only process those devices are that not managed by the local node. |
835 | if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) { | 864 | if (!Objects.equal(local, mastershipService.getMasterFor(deviceId))) { |
836 | - Map<FlowId, Set<StoredFlowEntry>> backupFlowTable = getFlowTable(deviceId); | 865 | + Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> backupFlowTable = |
866 | + getFlowTable(deviceId); | ||
837 | backupFlowTable.clear(); | 867 | backupFlowTable.clear(); |
838 | backupFlowTable.putAll(deviceFlowTable); | 868 | backupFlowTable.putAll(deviceFlowTable); |
839 | backedupDevices.add(deviceId); | 869 | backedupDevices.add(deviceId); | ... | ... |
... | @@ -416,6 +416,7 @@ public class NewAdaptiveFlowStatsCollector { | ... | @@ -416,6 +416,7 @@ public class NewAdaptiveFlowStatsCollector { |
416 | + " AdaptiveStats collection thread for {}", | 416 | + " AdaptiveStats collection thread for {}", |
417 | sw.getStringId()); | 417 | sw.getStringId()); |
418 | 418 | ||
419 | + //FIXME modification of "stored" flow entry outside of store | ||
419 | stored.setLastSeen(); | 420 | stored.setLastSeen(); |
420 | continue; | 421 | continue; |
421 | } else if (fe.life() < stored.life()) { | 422 | } else if (fe.life() < stored.life()) { |
... | @@ -428,11 +429,13 @@ public class NewAdaptiveFlowStatsCollector { | ... | @@ -428,11 +429,13 @@ public class NewAdaptiveFlowStatsCollector { |
428 | ", new life=" + fe.life() + ", old life=" + stored.life() + | 429 | ", new life=" + fe.life() + ", old life=" + stored.life() + |
429 | ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen()); | 430 | ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen()); |
430 | // go next | 431 | // go next |
432 | + //FIXME modification of "stored" flow entry outside of store | ||
431 | stored.setLastSeen(); | 433 | stored.setLastSeen(); |
432 | continue; | 434 | continue; |
433 | } | 435 | } |
434 | 436 | ||
435 | // update now | 437 | // update now |
438 | + //FIXME modification of "stored" flow entry outside of store | ||
436 | stored.setLife(fe.life()); | 439 | stored.setLife(fe.life()); |
437 | stored.setPackets(fe.packets()); | 440 | stored.setPackets(fe.packets()); |
438 | stored.setBytes(fe.bytes()); | 441 | stored.setBytes(fe.bytes()); | ... | ... |
-
Please register or login to post a comment