Committed by
Gerrit Code Review
Configuration options for disabling tombstones in ECMap + disabling backups in Dist flow rule store
Change-Id: I28b17f3d0bb7f5ba87a541b7f6337c3c1b587d36
Showing
2 changed files
with
45 additions
and
17 deletions
... | @@ -107,6 +107,7 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -107,6 +107,7 @@ public class EventuallyConsistentMapImpl<K, V> |
107 | private long initialDelaySec = 5; | 107 | private long initialDelaySec = 5; |
108 | private long periodSec = 5; | 108 | private long periodSec = 5; |
109 | private boolean lightweightAntiEntropy = true; | 109 | private boolean lightweightAntiEntropy = true; |
110 | + private boolean tombstonesDisabled = false; | ||
110 | 111 | ||
111 | private static final int WINDOW_SIZE = 5; | 112 | private static final int WINDOW_SIZE = 5; |
112 | private static final int HIGH_LOAD_THRESHOLD = 0; | 113 | private static final int HIGH_LOAD_THRESHOLD = 0; |
... | @@ -223,6 +224,11 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -223,6 +224,11 @@ public class EventuallyConsistentMapImpl<K, V> |
223 | .collect(Collectors.toList())); | 224 | .collect(Collectors.toList())); |
224 | } | 225 | } |
225 | 226 | ||
227 | + public EventuallyConsistentMapImpl<K, V> withTombstonesDisabled(boolean status) { | ||
228 | + tombstonesDisabled = status; | ||
229 | + return this; | ||
230 | + } | ||
231 | + | ||
226 | private KryoSerializer createSerializer(KryoNamespace.Builder builder) { | 232 | private KryoSerializer createSerializer(KryoNamespace.Builder builder) { |
227 | return new KryoSerializer() { | 233 | return new KryoSerializer() { |
228 | @Override | 234 | @Override |
... | @@ -379,14 +385,18 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -379,14 +385,18 @@ public class EventuallyConsistentMapImpl<K, V> |
379 | return false; | 385 | return false; |
380 | } | 386 | } |
381 | 387 | ||
382 | - Timestamp removedTimestamp = removedItems.get(key); | 388 | + if (!tombstonesDisabled) { |
383 | - if (removedTimestamp == null) { | 389 | + Timestamp removedTimestamp = removedItems.get(key); |
384 | - return removedItems.putIfAbsent(key, timestamp) == null; | 390 | + if (removedTimestamp == null) { |
385 | - } else if (timestamp.isNewerThan(removedTimestamp)) { | 391 | + return removedItems.putIfAbsent(key, timestamp) == null; |
386 | - return removedItems.replace(key, removedTimestamp, timestamp); | 392 | + } else if (timestamp.isNewerThan(removedTimestamp)) { |
387 | - } else { | 393 | + return removedItems.replace(key, removedTimestamp, timestamp); |
388 | - return false; | 394 | + } else { |
395 | + return false; | ||
396 | + } | ||
389 | } | 397 | } |
398 | + | ||
399 | + return updated.booleanValue(); | ||
390 | } | 400 | } |
391 | 401 | ||
392 | @Override | 402 | @Override | ... | ... |
... | @@ -152,7 +152,7 @@ public class DistributedFlowRuleStore | ... | @@ -152,7 +152,7 @@ public class DistributedFlowRuleStore |
152 | @Activate | 152 | @Activate |
153 | public void activate() { | 153 | public void activate() { |
154 | 154 | ||
155 | - flowTable = new InternalFlowTable(); | 155 | + flowTable = new InternalFlowTable(); // .withBackupsEnabled(false); |
156 | 156 | ||
157 | idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC); | 157 | idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC); |
158 | 158 | ||
... | @@ -616,6 +616,18 @@ public class DistributedFlowRuleStore | ... | @@ -616,6 +616,18 @@ public class DistributedFlowRuleStore |
616 | 616 | ||
617 | private class InternalFlowTable { | 617 | private class InternalFlowTable { |
618 | 618 | ||
619 | + private boolean backupsEnabled = true; | ||
620 | + | ||
621 | + /** | ||
622 | + * Turns backups on or off. | ||
623 | + * @param backupsEnabled whether backups should be enabled or not | ||
624 | + * @return this instance | ||
625 | + */ | ||
626 | + public InternalFlowTable withBackupsEnabled(boolean backupsEnabled) { | ||
627 | + this.backupsEnabled = backupsEnabled; | ||
628 | + return this; | ||
629 | + } | ||
630 | + | ||
619 | private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> | 631 | private final Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> |
620 | flowEntries = Maps.newConcurrentMap(); | 632 | flowEntries = Maps.newConcurrentMap(); |
621 | 633 | ||
... | @@ -627,13 +639,14 @@ public class DistributedFlowRuleStore | ... | @@ -627,13 +639,14 @@ public class DistributedFlowRuleStore |
627 | (flowId, flowEntry) -> | 639 | (flowId, flowEntry) -> |
628 | (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId()); | 640 | (flowEntry == null) ? null : deviceClockService.getTimestamp(flowEntry.deviceId()); |
629 | 641 | ||
630 | - private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = | 642 | + private final EventuallyConsistentMap<FlowId, StoredFlowEntry> backupMap = backupsEnabled ? |
631 | new EventuallyConsistentMapImpl<>("flow-backup", | 643 | new EventuallyConsistentMapImpl<>("flow-backup", |
632 | clusterService, | 644 | clusterService, |
633 | clusterCommunicator, | 645 | clusterCommunicator, |
634 | flowSerializer, | 646 | flowSerializer, |
635 | clockService, | 647 | clockService, |
636 | - (key, flowEntry) -> getPeerNodes()); | 648 | + (key, flowEntry) -> getPeerNodes()).withTombstonesDisabled(true) |
649 | + : null; | ||
637 | 650 | ||
638 | private Collection<NodeId> getPeerNodes() { | 651 | private Collection<NodeId> getPeerNodes() { |
639 | List<NodeId> nodes = clusterService.getNodes() | 652 | List<NodeId> nodes = clusterService.getNodes() |
... | @@ -651,6 +664,10 @@ public class DistributedFlowRuleStore | ... | @@ -651,6 +664,10 @@ public class DistributedFlowRuleStore |
651 | } | 664 | } |
652 | 665 | ||
653 | public void loadFromBackup(DeviceId deviceId) { | 666 | public void loadFromBackup(DeviceId deviceId) { |
667 | + if (!backupsEnabled) { | ||
668 | + return; | ||
669 | + } | ||
670 | + | ||
654 | ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>(); | 671 | ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>(); |
655 | 672 | ||
656 | backupMap.values() | 673 | backupMap.values() |
... | @@ -699,18 +716,19 @@ public class DistributedFlowRuleStore | ... | @@ -699,18 +716,19 @@ public class DistributedFlowRuleStore |
699 | 716 | ||
700 | public void add(StoredFlowEntry rule) { | 717 | public void add(StoredFlowEntry rule) { |
701 | getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule); | 718 | getFlowEntriesInternal(rule.deviceId(), rule.id()).add(rule); |
702 | - | 719 | + if (backupsEnabled) { |
703 | - try { | 720 | + try { |
704 | - backupMap.put(rule.id(), rule); | 721 | + backupMap.put(rule.id(), rule); |
705 | - } catch (Exception e) { | 722 | + } catch (Exception e) { |
706 | - log.warn("Failed to backup flow rule", e); | 723 | + log.warn("Failed to backup flow rule", e); |
724 | + } | ||
707 | } | 725 | } |
708 | } | 726 | } |
709 | 727 | ||
710 | public boolean remove(DeviceId deviceId, FlowEntry rule) { | 728 | public boolean remove(DeviceId deviceId, FlowEntry rule) { |
711 | boolean status = | 729 | boolean status = |
712 | getFlowEntriesInternal(deviceId, rule.id()).remove(rule); | 730 | getFlowEntriesInternal(deviceId, rule.id()).remove(rule); |
713 | - if (status) { | 731 | + if (backupsEnabled && status) { |
714 | try { | 732 | try { |
715 | backupMap.remove(rule.id(), (DefaultFlowEntry) rule); | 733 | backupMap.remove(rule.id(), (DefaultFlowEntry) rule); |
716 | } catch (Exception e) { | 734 | } catch (Exception e) { |
... | @@ -725,4 +743,4 @@ public class DistributedFlowRuleStore | ... | @@ -725,4 +743,4 @@ public class DistributedFlowRuleStore |
725 | // Flow entries should continue to remain in backup map. | 743 | // Flow entries should continue to remain in backup map. |
726 | } | 744 | } |
727 | } | 745 | } |
728 | -} | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
746 | +} | ... | ... |
-
Please register or login to post a comment