alshabib
Committed by Gerrit Code Review

allow flow to be marked for removal from flow store when device is disconnected

Change-Id: I0f60ff4f010d0d149be31272b9e592c5d812bef9
...@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; ...@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Iterables; 22 import com.google.common.collect.Iterables;
23 import com.google.common.collect.Maps; 23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets; 24 import com.google.common.collect.Sets;
25 -import com.google.common.util.concurrent.ListenableFuture;
26 import com.hazelcast.core.IMap; 25 import com.hazelcast.core.IMap;
27 import org.apache.felix.scr.annotations.Activate; 26 import org.apache.felix.scr.annotations.Activate;
28 import org.apache.felix.scr.annotations.Component; 27 import org.apache.felix.scr.annotations.Component;
...@@ -242,6 +241,7 @@ public class DistributedFlowRuleStore ...@@ -242,6 +241,7 @@ public class DistributedFlowRuleStore
242 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES); 241 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
243 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY); 242 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
244 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS); 243 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
244 + clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
245 replicaInfoManager.removeListener(replicaInfoEventListener); 245 replicaInfoManager.removeListener(replicaInfoEventListener);
246 log.info("Stopped"); 246 log.info("Stopped");
247 } 247 }
...@@ -346,20 +346,18 @@ public class DistributedFlowRuleStore ...@@ -346,20 +346,18 @@ public class DistributedFlowRuleStore
346 return; 346 return;
347 } 347 }
348 348
349 - DeviceId deviceId = operation.getOperations().get(0).target().deviceId(); 349 + DeviceId deviceId = operation.deviceId();
350 350
351 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId); 351 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
352 352
353 if (!replicaInfo.master().isPresent()) { 353 if (!replicaInfo.master().isPresent()) {
354 - log.warn("Failed to storeBatch: No master for {}", deviceId); 354 + log.warn("No master for {} : flows will be marked for removal", deviceId);
355 355
356 - Set<FlowRule> allFailures = operation.getOperations().stream() 356 + updateStoreInternal(operation);
357 - .map(op -> op.getTarget())
358 - .collect(Collectors.toSet());
359 357
360 notifyDelegate(FlowRuleBatchEvent.completed( 358 notifyDelegate(FlowRuleBatchEvent.completed(
361 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), 359 new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
362 - new CompletedBatchOperation(false, allFailures, operation.deviceId()))); 360 + new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
363 return; 361 return;
364 } 362 }
365 363
...@@ -377,19 +375,12 @@ public class DistributedFlowRuleStore ...@@ -377,19 +375,12 @@ public class DistributedFlowRuleStore
377 APPLY_BATCH_FLOWS, 375 APPLY_BATCH_FLOWS,
378 SERIALIZER.encode(operation)); 376 SERIALIZER.encode(operation));
379 377
380 - //CompletedBatchOperation response; 378 +
381 try { 379 try {
382 - ListenableFuture<byte[]> responseFuture =
383 - clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
384 - /*response =
385 - Futures.transform(responseFuture,
386 - new DecodeTo<CompletedBatchOperation>(SERIALIZER))
387 - .get(500 * operation.size(), TimeUnit.MILLISECONDS);
388 380
389 - notifyDelegate(FlowRuleBatchEvent.completed( 381 + clusterCommunicator.unicast(message, replicaInfo.master().get());
390 - new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
391 382
392 - } catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) { 383 + } catch (IOException e) {
393 log.warn("Failed to storeBatch: {}", e.getMessage()); 384 log.warn("Failed to storeBatch: {}", e.getMessage());
394 385
395 Set<FlowRule> allFailures = operation.getOperations().stream() 386 Set<FlowRule> allFailures = operation.getOperations().stream()
...@@ -408,10 +399,23 @@ public class DistributedFlowRuleStore ...@@ -408,10 +399,23 @@ public class DistributedFlowRuleStore
408 399
409 final DeviceId did = operation.deviceId(); 400 final DeviceId did = operation.deviceId();
410 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did); 401 //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
411 - Set<FlowRuleBatchEntry> currentOps; 402 + Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
403 + if (currentOps.isEmpty()) {
404 + batchOperationComplete(FlowRuleBatchEvent.completed(
405 + new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
406 + new CompletedBatchOperation(true, Collections.emptySet(), did)));
407 + return;
408 + }
409 + updateBackup(did, currentOps);
410 +
411 + notifyDelegate(FlowRuleBatchEvent.requested(new
412 + FlowRuleBatchRequest(operation.id(),
413 + currentOps), operation.deviceId()));
412 414
415 + }
413 416
414 - currentOps = operation.getOperations().stream().map( 417 + private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
418 + return operation.getOperations().stream().map(
415 op -> { 419 op -> {
416 StoredFlowEntry entry; 420 StoredFlowEntry entry;
417 switch (op.getOperator()) { 421 switch (op.getOperator()) {
...@@ -439,19 +443,6 @@ public class DistributedFlowRuleStore ...@@ -439,19 +443,6 @@ public class DistributedFlowRuleStore
439 return null; 443 return null;
440 } 444 }
441 ).filter(op -> op != null).collect(Collectors.toSet()); 445 ).filter(op -> op != null).collect(Collectors.toSet());
442 - if (currentOps.isEmpty()) {
443 - batchOperationComplete(FlowRuleBatchEvent.completed(
444 - new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
445 - new CompletedBatchOperation(true, Collections.emptySet(), did)));
446 - return;
447 - }
448 - updateBackup(did, currentOps);
449 -
450 -
451 - notifyDelegate(FlowRuleBatchEvent.requested(new
452 - FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
453 -
454 -
455 } 446 }
456 447
457 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) { 448 private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
...@@ -581,7 +572,7 @@ public class DistributedFlowRuleStore ...@@ -581,7 +572,7 @@ public class DistributedFlowRuleStore
581 clusterService.getLocalNode().id(), 572 clusterService.getLocalNode().id(),
582 REMOTE_APPLY_COMPLETED, 573 REMOTE_APPLY_COMPLETED,
583 SERIALIZER.encode(event)); 574 SERIALIZER.encode(event));
584 - clusterCommunicator.sendAndReceive(message, nodeId); 575 + clusterCommunicator.unicast(message, nodeId);
585 } catch (IOException e) { 576 } catch (IOException e) {
586 log.warn("Failed to respond to peer for batch operation result"); 577 log.warn("Failed to respond to peer for batch operation result");
587 } 578 }
......