Madan Jampani
Committed by Gerrit Code Review

New flowrule store that utilizes a simple mechanism to periodically backsup its flowentries.

For efficiency reasons backups are only run for those devices whose flow entries are updated (since last backup) or if the device master/backup has changed.
This backup mechanism will be a short term solution until we get to a more close to real-time backup approach.
This change also disables the flowrule store based on hazelcast.

Change-Id: Iaae08852edee20b999ff97c60ca8bc6576e645f6
...@@ -17,8 +17,8 @@ package org.onosproject.store.flow; ...@@ -17,8 +17,8 @@ package org.onosproject.store.flow;
17 17
18 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
19 19
20 -import java.util.Collection;
21 import java.util.Collections; 20 import java.util.Collections;
21 +import java.util.List;
22 22
23 import org.onosproject.cluster.NodeId; 23 import org.onosproject.cluster.NodeId;
24 24
...@@ -30,15 +30,15 @@ import com.google.common.base.Optional; ...@@ -30,15 +30,15 @@ import com.google.common.base.Optional;
30 public final class ReplicaInfo { 30 public final class ReplicaInfo {
31 31
32 private final Optional<NodeId> master; 32 private final Optional<NodeId> master;
33 - private final Collection<NodeId> backups; 33 + private final List<NodeId> backups;
34 34
35 /** 35 /**
36 * Creates a ReplicaInfo instance. 36 * Creates a ReplicaInfo instance.
37 * 37 *
38 * @param master NodeId of the node where the master copy should be 38 * @param master NodeId of the node where the master copy should be
39 - * @param backups collection of NodeId, where backup copies should be placed 39 + * @param backups list of NodeId, where backup copies should be placed
40 */ 40 */
41 - public ReplicaInfo(NodeId master, Collection<NodeId> backups) { 41 + public ReplicaInfo(NodeId master, List<NodeId> backups) {
42 this.master = Optional.fromNullable(master); 42 this.master = Optional.fromNullable(master);
43 this.backups = checkNotNull(backups); 43 this.backups = checkNotNull(backups);
44 } 44 }
...@@ -57,7 +57,7 @@ public final class ReplicaInfo { ...@@ -57,7 +57,7 @@ public final class ReplicaInfo {
57 * 57 *
58 * @return collection of NodeId, where backup copies should be placed 58 * @return collection of NodeId, where backup copies should be placed
59 */ 59 */
60 - public Collection<NodeId> backups() { 60 + public List<NodeId> backups() {
61 return backups; 61 return backups;
62 } 62 }
63 63
......
...@@ -108,7 +108,7 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -108,7 +108,7 @@ import static org.slf4j.LoggerFactory.getLogger;
108 /** 108 /**
109 * Manages inventory of flow rules using a distributed state management protocol. 109 * Manages inventory of flow rules using a distributed state management protocol.
110 */ 110 */
111 -@Component(immediate = true) 111 +@Component(immediate = false, enabled = false)
112 @Service 112 @Service
113 public class DistributedFlowRuleStore 113 public class DistributedFlowRuleStore
114 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate> 114 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
......
...@@ -36,5 +36,8 @@ public final class FlowStoreMessageSubjects { ...@@ -36,5 +36,8 @@ public final class FlowStoreMessageSubjects {
36 = new MessageSubject("peer-forward-remove-flow-entry"); 36 = new MessageSubject("peer-forward-remove-flow-entry");
37 37
38 public static final MessageSubject REMOTE_APPLY_COMPLETED 38 public static final MessageSubject REMOTE_APPLY_COMPLETED
39 - = new MessageSubject("peer-apply-completed"); 39 + = new MessageSubject("peer-apply-completed");
40 +
41 + public static final MessageSubject FLOW_TABLE_BACKUP
42 + = new MessageSubject("peer-flow-table-backup");
40 } 43 }
......
1 + /*
2 + * Copyright 2014-2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.flow.impl;
17 +
18 +import com.google.common.base.Objects;
19 +import com.google.common.collect.Iterables;
20 +import com.google.common.collect.Maps;
21 +import com.google.common.collect.Sets;
22 +import com.google.common.util.concurrent.Futures;
23 +
24 +import org.apache.felix.scr.annotations.Activate;
25 +import org.apache.felix.scr.annotations.Component;
26 +import org.apache.felix.scr.annotations.Deactivate;
27 +import org.apache.felix.scr.annotations.Modified;
28 +import org.apache.felix.scr.annotations.Property;
29 +import org.apache.felix.scr.annotations.Reference;
30 +import org.apache.felix.scr.annotations.ReferenceCardinality;
31 +import org.apache.felix.scr.annotations.Service;
32 +import org.onlab.util.KryoNamespace;
33 +import org.onlab.util.NewConcurrentHashMap;
34 +import org.onlab.util.Tools;
35 +import org.onosproject.cfg.ComponentConfigService;
36 +import org.onosproject.cluster.ClusterService;
37 +import org.onosproject.cluster.NodeId;
38 +import org.onosproject.core.CoreService;
39 +import org.onosproject.core.IdGenerator;
40 +import org.onosproject.mastership.MastershipService;
41 +import org.onosproject.net.DeviceId;
42 +import org.onosproject.net.device.DeviceService;
43 +import org.onosproject.net.flow.CompletedBatchOperation;
44 +import org.onosproject.net.flow.DefaultFlowEntry;
45 +import org.onosproject.net.flow.FlowEntry;
46 +import org.onosproject.net.flow.FlowEntry.FlowEntryState;
47 +import org.onosproject.net.flow.FlowId;
48 +import org.onosproject.net.flow.FlowRule;
49 +import org.onosproject.net.flow.FlowRuleBatchEntry;
50 +import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
51 +import org.onosproject.net.flow.FlowRuleBatchEvent;
52 +import org.onosproject.net.flow.FlowRuleBatchOperation;
53 +import org.onosproject.net.flow.FlowRuleBatchRequest;
54 +import org.onosproject.net.flow.FlowRuleEvent;
55 +import org.onosproject.net.flow.FlowRuleEvent.Type;
56 +import org.onosproject.net.flow.FlowRuleService;
57 +import org.onosproject.net.flow.FlowRuleStore;
58 +import org.onosproject.net.flow.FlowRuleStoreDelegate;
59 +import org.onosproject.net.flow.StoredFlowEntry;
60 +import org.onosproject.store.AbstractStore;
61 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
62 +import org.onosproject.store.cluster.messaging.ClusterMessage;
63 +import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
64 +import org.onosproject.store.flow.ReplicaInfo;
65 +import org.onosproject.store.flow.ReplicaInfoService;
66 +import org.onosproject.store.serializers.KryoSerializer;
67 +import org.onosproject.store.serializers.StoreSerializer;
68 +import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
69 +import org.osgi.service.component.ComponentContext;
70 +import org.slf4j.Logger;
71 +
72 +import java.util.Arrays;
73 +import java.util.Collections;
74 +import java.util.Dictionary;
75 +import java.util.HashSet;
76 +import java.util.List;
77 +import java.util.Map;
78 +import java.util.Set;
79 +import java.util.concurrent.ConcurrentHashMap;
80 +import java.util.concurrent.ConcurrentMap;
81 +import java.util.concurrent.ExecutorService;
82 +import java.util.concurrent.Executors;
83 +import java.util.concurrent.ScheduledExecutorService;
84 +import java.util.concurrent.TimeUnit;
85 +import java.util.concurrent.atomic.AtomicInteger;
86 +import java.util.stream.Collectors;
87 +
88 +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
89 +import static com.google.common.base.Strings.isNullOrEmpty;
90 +import static org.onlab.util.Tools.get;
91 +import static org.onlab.util.Tools.groupedThreads;
92 +import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
93 +import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
94 +import static org.slf4j.LoggerFactory.getLogger;
95 +
96 +/**
97 + * Manages inventory of flow rules using a distributed state management protocol.
98 + */
99 +@Component(immediate = true, enabled = true)
100 +@Service
101 +public class NewDistributedFlowRuleStore
102 + extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
103 + implements FlowRuleStore {
104 +
105 + private final Logger log = getLogger(getClass());
106 +
107 + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
108 + private static final boolean DEFAULT_BACKUP_ENABLED = true;
109 + private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
110 +
111 + @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
112 + label = "Number of threads in the message handler pool")
113 + private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
114 +
115 + @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
116 + label = "Indicates whether backups are enabled or not")
117 + private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
118 +
119 + private InternalFlowTable flowTable = new InternalFlowTable();
120 +
121 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 + protected ReplicaInfoService replicaInfoManager;
123 +
124 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
125 + protected ClusterCommunicationService clusterCommunicator;
126 +
127 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128 + protected ClusterService clusterService;
129 +
130 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131 + protected DeviceService deviceService;
132 +
133 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134 + protected CoreService coreService;
135 +
136 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 + protected ComponentConfigService configService;
138 +
139 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140 + protected MastershipService mastershipService;
141 +
142 + private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
143 + private ExecutorService messageHandlingExecutor;
144 +
145 + private final ScheduledExecutorService backupSenderExecutor =
146 + Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
147 +
148 + protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
149 + @Override
150 + protected void setupKryoPool() {
151 + serializerPool = KryoNamespace.newBuilder()
152 + .register(DistributedStoreSerializers.STORE_COMMON)
153 + .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
154 + .register(FlowRuleEvent.class)
155 + .register(FlowRuleEvent.Type.class)
156 + .build();
157 + }
158 + };
159 +
160 + private IdGenerator idGenerator;
161 + private NodeId local;
162 +
163 + @Activate
164 + public void activate(ComponentContext context) {
165 + configService.registerProperties(getClass());
166 +
167 + idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
168 +
169 + local = clusterService.getLocalNode().id();
170 +
171 + messageHandlingExecutor = Executors.newFixedThreadPool(
172 + msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
173 +
174 + registerMessageHandlers(messageHandlingExecutor);
175 +
176 + backupSenderExecutor.scheduleWithFixedDelay(() -> flowTable.backup(), 0, 2000, TimeUnit.MILLISECONDS);
177 +
178 + logConfig("Started");
179 + }
180 +
181 + @Deactivate
182 + public void deactivate(ComponentContext context) {
183 + configService.unregisterProperties(getClass(), false);
184 + unregisterMessageHandlers();
185 + messageHandlingExecutor.shutdownNow();
186 + backupSenderExecutor.shutdownNow();
187 + log.info("Stopped");
188 + }
189 +
190 + @SuppressWarnings("rawtypes")
191 + @Modified
192 + public void modified(ComponentContext context) {
193 + if (context == null) {
194 + backupEnabled = DEFAULT_BACKUP_ENABLED;
195 + logConfig("Default config");
196 + return;
197 + }
198 +
199 + Dictionary properties = context.getProperties();
200 + int newPoolSize;
201 + boolean newBackupEnabled;
202 + try {
203 + String s = get(properties, "msgHandlerPoolSize");
204 + newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
205 +
206 + s = get(properties, "backupEnabled");
207 + newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
208 +
209 + } catch (NumberFormatException | ClassCastException e) {
210 + newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
211 + newBackupEnabled = DEFAULT_BACKUP_ENABLED;
212 + }
213 +
214 + if (newBackupEnabled != backupEnabled) {
215 + backupEnabled = newBackupEnabled;
216 + }
217 + if (newPoolSize != msgHandlerPoolSize) {
218 + msgHandlerPoolSize = newPoolSize;
219 + ExecutorService oldMsgHandler = messageHandlingExecutor;
220 + messageHandlingExecutor = Executors.newFixedThreadPool(
221 + msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
222 +
223 + // replace previously registered handlers.
224 + registerMessageHandlers(messageHandlingExecutor);
225 + oldMsgHandler.shutdown();
226 + }
227 + logConfig("Reconfigured");
228 + }
229 +
230 + private void registerMessageHandlers(ExecutorService executor) {
231 +
232 + clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(), executor);
233 + clusterCommunicator.<FlowRuleBatchEvent>addSubscriber(
234 + REMOTE_APPLY_COMPLETED, SERIALIZER::decode, this::notifyDelegate, executor);
235 + clusterCommunicator.addSubscriber(
236 + GET_FLOW_ENTRY, SERIALIZER::decode, flowTable::getFlowEntry, SERIALIZER::encode, executor);
237 + clusterCommunicator.addSubscriber(
238 + GET_DEVICE_FLOW_ENTRIES, SERIALIZER::decode, flowTable::getFlowEntries, SERIALIZER::encode, executor);
239 + clusterCommunicator.addSubscriber(
240 + REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
241 + clusterCommunicator.addSubscriber(
242 + REMOVE_FLOW_ENTRY, SERIALIZER::decode, this::removeFlowRuleInternal, SERIALIZER::encode, executor);
243 + clusterCommunicator.addSubscriber(
244 + FLOW_TABLE_BACKUP, SERIALIZER::decode, flowTable::onBackupReceipt, executor);
245 + }
246 +
247 + private void unregisterMessageHandlers() {
248 + clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
249 + clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
250 + clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
251 + clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
252 + clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
253 + clusterCommunicator.removeSubscriber(FLOW_TABLE_BACKUP);
254 + }
255 +
256 + private void logConfig(String prefix) {
257 + log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
258 + prefix, msgHandlerPoolSize, backupEnabled);
259 + }
260 +
261 + // This is not a efficient operation on a distributed sharded
262 + // flow store. We need to revisit the need for this operation or at least
263 + // make it device specific.
264 + @Override
265 + public int getFlowRuleCount() {
266 + AtomicInteger sum = new AtomicInteger(0);
267 + deviceService.getDevices().forEach(device -> sum.addAndGet(Iterables.size(getFlowEntries(device.id()))));
268 + return sum.get();
269 + }
270 +
271 + @Override
272 + public FlowEntry getFlowEntry(FlowRule rule) {
273 +
274 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
275 + NodeId master = replicaInfo.master().orNull();
276 +
277 + if (master == null) {
278 + log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
279 + return null;
280 + }
281 +
282 + if (Objects.equal(local, master)) {
283 + return flowTable.getFlowEntry(rule);
284 + }
285 +
286 + log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
287 + master, rule.deviceId());
288 +
289 + return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
290 + FlowStoreMessageSubjects.GET_FLOW_ENTRY,
291 + SERIALIZER::encode,
292 + SERIALIZER::decode,
293 + master),
294 + FLOW_RULE_STORE_TIMEOUT_MILLIS,
295 + TimeUnit.MILLISECONDS,
296 + null);
297 + }
298 +
299 + @Override
300 + public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
301 +
302 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
303 + NodeId master = replicaInfo.master().orNull();
304 +
305 + if (master == null) {
306 + log.warn("Failed to getFlowEntries: No master for {}", deviceId);
307 + return Collections.emptyList();
308 + }
309 +
310 + if (Objects.equal(local, master)) {
311 + return flowTable.getFlowEntries(deviceId);
312 + }
313 +
314 + log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
315 + master, deviceId);
316 +
317 + return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
318 + FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
319 + SERIALIZER::encode,
320 + SERIALIZER::decode,
321 + master),
322 + FLOW_RULE_STORE_TIMEOUT_MILLIS,
323 + TimeUnit.MILLISECONDS,
324 + Collections.emptyList());
325 + }
326 +
327 + @Override
328 + public void storeFlowRule(FlowRule rule) {
329 + storeBatch(new FlowRuleBatchOperation(
330 + Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
331 + rule.deviceId(), idGenerator.getNewId()));
332 + }
333 +
334 + @Override
335 + public void storeBatch(FlowRuleBatchOperation operation) {
336 + if (operation.getOperations().isEmpty()) {
337 + notifyDelegate(FlowRuleBatchEvent.completed(
338 + new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
339 + new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
340 + return;
341 + }
342 +
343 + DeviceId deviceId = operation.deviceId();
344 +
345 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
346 + NodeId master = replicaInfo.master().orNull();
347 +
348 + if (master == null) {
349 + log.warn("No master for {} : flows will be marked for removal", deviceId);
350 +
351 + updateStoreInternal(operation);
352 +
353 + notifyDelegate(FlowRuleBatchEvent.completed(
354 + new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
355 + new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
356 + return;
357 + }
358 +
359 + if (Objects.equal(local, master)) {
360 + storeBatchInternal(operation);
361 + return;
362 + }
363 +
364 + log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
365 + master, deviceId);
366 +
367 + if (!clusterCommunicator.unicast(operation,
368 + APPLY_BATCH_FLOWS,
369 + SERIALIZER::encode,
370 + master)) {
371 + log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
372 +
373 + Set<FlowRule> allFailures = operation.getOperations().stream()
374 + .map(op -> op.target())
375 + .collect(Collectors.toSet());
376 +
377 + notifyDelegate(FlowRuleBatchEvent.completed(
378 + new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
379 + new CompletedBatchOperation(false, allFailures, deviceId)));
380 + return;
381 + }
382 + }
383 +
384 + private void storeBatchInternal(FlowRuleBatchOperation operation) {
385 +
386 + final DeviceId did = operation.deviceId();
387 + //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
388 + Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
389 + if (currentOps.isEmpty()) {
390 + batchOperationComplete(FlowRuleBatchEvent.completed(
391 + new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
392 + new CompletedBatchOperation(true, Collections.emptySet(), did)));
393 + return;
394 + }
395 +
396 + notifyDelegate(FlowRuleBatchEvent.requested(new
397 + FlowRuleBatchRequest(operation.id(),
398 + currentOps), operation.deviceId()));
399 + }
400 +
401 + private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
402 + return operation.getOperations().stream().map(
403 + op -> {
404 + StoredFlowEntry entry;
405 + switch (op.operator()) {
406 + case ADD:
407 + entry = new DefaultFlowEntry(op.target());
408 + // always add requested FlowRule
409 + // Note: 2 equal FlowEntry may have different treatment
410 + flowTable.remove(entry.deviceId(), entry);
411 + flowTable.add(entry);
412 +
413 + return op;
414 + case REMOVE:
415 + entry = flowTable.getFlowEntry(op.target());
416 + if (entry != null) {
417 + entry.setState(FlowEntryState.PENDING_REMOVE);
418 + return op;
419 + }
420 + break;
421 + case MODIFY:
422 + //TODO: figure this out at some point
423 + break;
424 + default:
425 + log.warn("Unknown flow operation operator: {}", op.operator());
426 + }
427 + return null;
428 + }
429 + ).filter(op -> op != null).collect(Collectors.toSet());
430 + }
431 +
432 + @Override
433 + public void deleteFlowRule(FlowRule rule) {
434 + storeBatch(
435 + new FlowRuleBatchOperation(
436 + Arrays.asList(
437 + new FlowRuleBatchEntry(
438 + FlowRuleOperation.REMOVE,
439 + rule)), rule.deviceId(), idGenerator.getNewId()));
440 + }
441 +
442 + @Override
443 + public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
444 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
445 + if (Objects.equal(local, replicaInfo.master().orNull())) {
446 + return addOrUpdateFlowRuleInternal(rule);
447 + }
448 +
449 + log.warn("Tried to update FlowRule {} state,"
450 + + " while the Node was not the master.", rule);
451 + return null;
452 + }
453 +
454 + private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
455 + // check if this new rule is an update to an existing entry
456 + StoredFlowEntry stored = flowTable.getFlowEntry(rule);
457 + if (stored != null) {
458 + stored.setBytes(rule.bytes());
459 + stored.setLife(rule.life());
460 + stored.setPackets(rule.packets());
461 + if (stored.state() == FlowEntryState.PENDING_ADD) {
462 + stored.setState(FlowEntryState.ADDED);
463 + return new FlowRuleEvent(Type.RULE_ADDED, rule);
464 + }
465 + return new FlowRuleEvent(Type.RULE_UPDATED, rule);
466 + }
467 +
468 + // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
469 + // TODO: also update backup if the behavior is correct.
470 + flowTable.add(rule);
471 + return null;
472 + }
473 +
474 + @Override
475 + public FlowRuleEvent removeFlowRule(FlowEntry rule) {
476 + final DeviceId deviceId = rule.deviceId();
477 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
478 + NodeId master = replicaInfo.master().orNull();
479 +
480 + if (Objects.equal(local, master)) {
481 + // bypass and handle it locally
482 + return removeFlowRuleInternal(rule);
483 + }
484 +
485 + if (master == null) {
486 + log.warn("Failed to removeFlowRule: No master for {}", deviceId);
487 + // TODO: revisit if this should be null (="no-op") or Exception
488 + return null;
489 + }
490 +
491 + log.trace("Forwarding removeFlowRule to {}, which is the master for device {}",
492 + master, deviceId);
493 +
494 + return Futures.get(clusterCommunicator.sendAndReceive(
495 + rule,
496 + REMOVE_FLOW_ENTRY,
497 + SERIALIZER::encode,
498 + SERIALIZER::decode,
499 + master),
500 + FLOW_RULE_STORE_TIMEOUT_MILLIS,
501 + TimeUnit.MILLISECONDS,
502 + RuntimeException.class);
503 + }
504 +
505 + private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
506 + final DeviceId deviceId = rule.deviceId();
507 + // This is where one could mark a rule as removed and still keep it in the store.
508 + final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
509 + return removed ? new FlowRuleEvent(RULE_REMOVED, rule) : null;
510 + }
511 +
512 + @Override
513 + public void batchOperationComplete(FlowRuleBatchEvent event) {
514 + //FIXME: need a per device pending response
515 + NodeId nodeId = pendingResponses.remove(event.subject().batchId());
516 + if (nodeId == null) {
517 + notifyDelegate(event);
518 + } else {
519 + // TODO check unicast return value
520 + clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
521 + //error log: log.warn("Failed to respond to peer for batch operation result");
522 + }
523 + }
524 +
525 + private final class OnStoreBatch implements ClusterMessageHandler {
526 +
527 + @Override
528 + public void handle(final ClusterMessage message) {
529 + FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
530 + log.debug("received batch request {}", operation);
531 +
532 + final DeviceId deviceId = operation.deviceId();
533 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
534 + if (!local.equals(replicaInfo.master().orNull())) {
535 +
536 + Set<FlowRule> failures = new HashSet<>(operation.size());
537 + for (FlowRuleBatchEntry op : operation.getOperations()) {
538 + failures.add(op.target());
539 + }
540 + CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
541 + // This node is no longer the master, respond as all failed.
542 + // TODO: we might want to wrap response in envelope
543 + // to distinguish sw programming failure and hand over
544 + // it make sense in the latter case to retry immediately.
545 + message.respond(SERIALIZER.encode(allFailed));
546 + return;
547 + }
548 +
549 + pendingResponses.put(operation.id(), message.sender());
550 + storeBatchInternal(operation);
551 + }
552 + }
553 +
554 + private class InternalFlowTable {
555 +
556 + private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
557 + flowEntries = new ConcurrentHashMap<>();
558 +
559 + private final Map<DeviceId, Long> lastBackupTimes = Maps.newConcurrentMap();
560 + private final Map<DeviceId, Long> lastUpdateTimes = Maps.newConcurrentMap();
561 + private final Map<DeviceId, NodeId> lastBackupNodes = Maps.newConcurrentMap();
562 +
563 + private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
564 + return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
565 + }
566 +
567 + /**
568 + * Returns the flow table for specified device.
569 + *
570 + * @param deviceId identifier of the device
571 + * @return Map representing Flow Table of given device.
572 + */
573 + private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
574 + return createIfAbsentUnchecked(flowEntries, deviceId, lazyEmptyFlowTable());
575 + }
576 +
577 + private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
578 + return getFlowTable(deviceId).computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
579 + }
580 +
581 + private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
582 + Set<StoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.deviceId(), rule.id());
583 + return flowEntries.stream()
584 + .filter(entry -> Objects.equal(entry, rule))
585 + .findAny()
586 + .orElse(null);
587 + }
588 +
589 + private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
590 + Set<FlowEntry> result = Sets.newHashSet();
591 + getFlowTable(deviceId).values().forEach(result::addAll);
592 + return result;
593 + }
594 +
595 + public StoredFlowEntry getFlowEntry(FlowRule rule) {
596 + return getFlowEntryInternal(rule);
597 + }
598 +
599 + public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
600 + return getFlowEntriesInternal(deviceId);
601 + }
602 +
603 + public void add(FlowEntry rule) {
604 + getFlowEntriesInternal(rule.deviceId(), rule.id()).add((StoredFlowEntry) rule);
605 + lastUpdateTimes.put(rule.deviceId(), System.currentTimeMillis());
606 + }
607 +
608 + public boolean remove(DeviceId deviceId, FlowEntry rule) {
609 + try {
610 + return getFlowEntriesInternal(deviceId, rule.id()).remove(rule);
611 + } finally {
612 + lastUpdateTimes.put(deviceId, System.currentTimeMillis());
613 + }
614 + }
615 +
616 + private NodeId getBackupNode(DeviceId deviceId) {
617 + List<NodeId> deviceStandbys = replicaInfoManager.getReplicaInfoFor(deviceId).backups();
618 + // pick the standby which is most likely to become next master
619 + return deviceStandbys.isEmpty() ? null : deviceStandbys.get(0);
620 + }
621 +
622 + private void backup() {
623 + //TODO: Force backup when backups change.
624 + try {
625 + // determine the set of devices that we need to backup during this run.
626 + Set<DeviceId> devicesToBackup = mastershipService.getDevicesOf(local)
627 + .stream()
628 + .filter(deviceId -> {
629 + Long lastBackupTime = lastBackupTimes.get(deviceId);
630 + Long lastUpdateTime = lastUpdateTimes.get(deviceId);
631 + NodeId lastBackupNode = lastBackupNodes.get(deviceId);
632 + return lastBackupTime == null
633 + || !Objects.equal(lastBackupNode, getBackupNode(deviceId))
634 + || (lastUpdateTime != null && lastUpdateTime > lastBackupTime);
635 + })
636 + .collect(Collectors.toSet());
637 +
638 + // compute a mapping from node to the set of devices whose flow entries it should backup
639 + Map<NodeId, Set<DeviceId>> devicesToBackupByNode = Maps.newHashMap();
640 + devicesToBackup.forEach(deviceId -> {
641 + NodeId backupLocation = getBackupNode(deviceId);
642 + if (backupLocation != null) {
643 + devicesToBackupByNode.computeIfAbsent(backupLocation, nodeId -> Sets.newHashSet())
644 + .add(deviceId);
645 + }
646 + });
647 +
648 + // send the device flow entries to their respective backup nodes
649 + devicesToBackupByNode.forEach((nodeId, deviceIds) -> {
650 + Map<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> deviceFlowEntries =
651 + Maps.newConcurrentMap();
652 + flowEntries.forEach((key, value) -> {
653 + if (deviceIds.contains(key)) {
654 + deviceFlowEntries.put(key, value);
655 + }
656 + });
657 + clusterCommunicator.unicast(deviceFlowEntries,
658 + FLOW_TABLE_BACKUP,
659 + SERIALIZER::encode,
660 + nodeId);
661 + });
662 +
663 + // update state for use in subsequent run.
664 + devicesToBackupByNode.forEach((node, devices) -> {
665 + devices.forEach(id -> {
666 + lastBackupTimes.put(id, System.currentTimeMillis());
667 + lastBackupNodes.put(id, node);
668 + });
669 + });
670 + } catch (Exception e) {
671 + log.error("Backup failed.", e);
672 + }
673 + }
674 +
675 + private void onBackupReceipt(Map<DeviceId, Map<FlowId, Set<StoredFlowEntry>>> flowTables) {
676 + Set<DeviceId> managedDevices = mastershipService.getDevicesOf(local);
677 + Maps.filterKeys(flowTables, managedDevices::contains).forEach((deviceId, flowTable) -> {
678 + Map<FlowId, Set<StoredFlowEntry>> deviceFlowTable = getFlowTable(deviceId);
679 + deviceFlowTable.clear();
680 + deviceFlowTable.putAll(flowTable);
681 + });
682 + }
683 + }
684 +}
...@@ -21,6 +21,8 @@ import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED; ...@@ -21,6 +21,8 @@ import static org.onosproject.store.flow.ReplicaInfoEvent.Type.MASTER_CHANGED;
21 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED; 21 import static org.onosproject.store.flow.ReplicaInfoEvent.Type.BACKUPS_CHANGED;
22 22
23 import java.util.Collections; 23 import java.util.Collections;
24 +import java.util.List;
25 +
24 import org.apache.felix.scr.annotations.Activate; 26 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component; 27 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate; 28 import org.apache.felix.scr.annotations.Deactivate;
...@@ -28,6 +30,7 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -28,6 +30,7 @@ import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality; 30 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service; 31 import org.apache.felix.scr.annotations.Service;
30 import org.onosproject.cluster.NodeId; 32 import org.onosproject.cluster.NodeId;
33 +import org.onosproject.cluster.RoleInfo;
31 import org.onosproject.event.AbstractListenerRegistry; 34 import org.onosproject.event.AbstractListenerRegistry;
32 import org.onosproject.event.EventDeliveryService; 35 import org.onosproject.event.EventDeliveryService;
33 import org.onosproject.mastership.MastershipEvent; 36 import org.onosproject.mastership.MastershipEvent;
...@@ -76,9 +79,7 @@ public class ReplicaInfoManager implements ReplicaInfoService { ...@@ -76,9 +79,7 @@ public class ReplicaInfoManager implements ReplicaInfoService {
76 79
77 @Override 80 @Override
78 public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) { 81 public ReplicaInfo getReplicaInfoFor(DeviceId deviceId) {
79 - // TODO: populate backup List when we reach the point we need them. 82 + return buildFromRoleInfo(mastershipService.getNodesFor(deviceId));
80 - return new ReplicaInfo(mastershipService.getMasterFor(deviceId),
81 - Collections.<NodeId>emptyList());
82 } 83 }
83 84
84 @Override 85 @Override
...@@ -91,13 +92,17 @@ public class ReplicaInfoManager implements ReplicaInfoService { ...@@ -91,13 +92,17 @@ public class ReplicaInfoManager implements ReplicaInfoService {
91 listenerRegistry.removeListener(checkNotNull(listener)); 92 listenerRegistry.removeListener(checkNotNull(listener));
92 } 93 }
93 94
95 + private static ReplicaInfo buildFromRoleInfo(RoleInfo roles) {
96 + List<NodeId> backups = roles.backups() == null ?
97 + Collections.emptyList() : roles.backups();
98 + return new ReplicaInfo(roles.master(), backups);
99 + }
100 +
94 final class InternalMastershipListener implements MastershipListener { 101 final class InternalMastershipListener implements MastershipListener {
95 102
96 @Override 103 @Override
97 public void event(MastershipEvent event) { 104 public void event(MastershipEvent event) {
98 - final ReplicaInfo replicaInfo 105 + final ReplicaInfo replicaInfo = buildFromRoleInfo(event.roleInfo());
99 - = new ReplicaInfo(event.roleInfo().master(),
100 - event.roleInfo().backups());
101 106
102 switch (event.type()) { 107 switch (event.type()) {
103 case MASTER_CHANGED: 108 case MASTER_CHANGED:
......
...@@ -38,7 +38,7 @@ import java.io.InputStream; ...@@ -38,7 +38,7 @@ import java.io.InputStream;
38 /** 38 /**
39 * Auxiliary bootstrap of distributed store. 39 * Auxiliary bootstrap of distributed store.
40 */ 40 */
41 -@Component(immediate = true) 41 +@Component(immediate = false, enabled = false)
42 @Service 42 @Service
43 public class StoreManager implements StoreService { 43 public class StoreManager implements StoreService {
44 44
......
...@@ -218,6 +218,8 @@ public class ConsistentDeviceMastershipStore ...@@ -218,6 +218,8 @@ public class ConsistentDeviceMastershipStore
218 NodeId master = null; 218 NodeId master = null;
219 final List<NodeId> standbys = Lists.newLinkedList(); 219 final List<NodeId> standbys = Lists.newLinkedList();
220 220
221 + List<NodeId> candidates = leadershipService.getCandidates(createDeviceMastershipTopic(deviceId));
222 +
221 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) { 223 for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
222 if (entry.getValue() == MastershipRole.MASTER) { 224 if (entry.getValue() == MastershipRole.MASTER) {
223 master = entry.getKey(); 225 master = entry.getKey();
...@@ -226,7 +228,9 @@ public class ConsistentDeviceMastershipStore ...@@ -226,7 +228,9 @@ public class ConsistentDeviceMastershipStore
226 } 228 }
227 } 229 }
228 230
229 - return new RoleInfo(master, standbys); 231 + List<NodeId> sortedStandbyList = candidates.stream().filter(standbys::contains).collect(Collectors.toList());
232 +
233 + return new RoleInfo(master, sortedStandbyList);
230 } 234 }
231 235
232 @Override 236 @Override
......
...@@ -153,6 +153,11 @@ public class ReplicaInfoManagerTest { ...@@ -153,6 +153,11 @@ public class ReplicaInfoManagerTest {
153 } 153 }
154 154
155 @Override 155 @Override
156 + public RoleInfo getNodesFor(DeviceId deviceId) {
157 + return new RoleInfo(masters.get(deviceId), Collections.emptyList());
158 + }
159 +
160 + @Override
156 public void addListener(MastershipListener listener) { 161 public void addListener(MastershipListener listener) {
157 mastershipListenerRegistry.addListener(listener); 162 mastershipListenerRegistry.addListener(listener);
158 } 163 }
......
...@@ -18,6 +18,7 @@ package org.onosproject.store.serializers; ...@@ -18,6 +18,7 @@ package org.onosproject.store.serializers;
18 import com.google.common.collect.ImmutableList; 18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableMap; 19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.ImmutableSet; 20 import com.google.common.collect.ImmutableSet;
21 +
21 import org.onlab.packet.ChassisId; 22 import org.onlab.packet.ChassisId;
22 import org.onlab.packet.Ip4Address; 23 import org.onlab.packet.Ip4Address;
23 import org.onlab.packet.Ip4Prefix; 24 import org.onlab.packet.Ip4Prefix;
...@@ -170,6 +171,8 @@ import java.util.HashMap; ...@@ -170,6 +171,8 @@ import java.util.HashMap;
170 import java.util.HashSet; 171 import java.util.HashSet;
171 import java.util.LinkedList; 172 import java.util.LinkedList;
172 import java.util.Optional; 173 import java.util.Optional;
174 +import java.util.concurrent.ConcurrentHashMap;
175 +import java.util.concurrent.CopyOnWriteArraySet;
173 176
174 public final class KryoNamespaces { 177 public final class KryoNamespaces {
175 178
...@@ -191,6 +194,8 @@ public final class KryoNamespaces { ...@@ -191,6 +194,8 @@ public final class KryoNamespaces {
191 ImmutableMap.of("a", 1).getClass(), 194 ImmutableMap.of("a", 1).getClass(),
192 ImmutableMap.of("R", 2, "D", 2).getClass()) 195 ImmutableMap.of("R", 2, "D", 2).getClass())
193 .register(HashMap.class) 196 .register(HashMap.class)
197 + .register(ConcurrentHashMap.class)
198 + .register(CopyOnWriteArraySet.class)
194 .register(ArrayList.class, 199 .register(ArrayList.class,
195 LinkedList.class, 200 LinkedList.class,
196 HashSet.class 201 HashSet.class
......