Yuta HIGUCHI
Committed by Gerrit Code Review

DistributedFlowRuleStore: support remote removeFlowRule needed for cancelling Batch

Change-Id: I40f8dd8c2008e93c5ac7393295374726f83353c7
...@@ -213,6 +213,21 @@ public class DistributedFlowRuleStore ...@@ -213,6 +213,21 @@ public class DistributedFlowRuleStore
213 } 213 }
214 }); 214 });
215 215
216 + clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
217 +
218 + @Override
219 + public void handle(ClusterMessage message) {
220 + FlowEntry rule = SERIALIZER.decode(message.payload());
221 + log.trace("received get flow entry request for {}", rule);
222 + FlowRuleEvent event = removeFlowRuleInternal(rule);
223 + try {
224 + message.respond(SERIALIZER.encode(event));
225 + } catch (IOException e) {
226 + log.error("Failed to respond back", e);
227 + }
228 + }
229 + });
230 +
216 replicaInfoEventListener = new InternalReplicaInfoEventListener(); 231 replicaInfoEventListener = new InternalReplicaInfoEventListener();
217 232
218 replicaInfoManager.addListener(replicaInfoEventListener); 233 replicaInfoManager.addListener(replicaInfoEventListener);
...@@ -222,6 +237,10 @@ public class DistributedFlowRuleStore ...@@ -222,6 +237,10 @@ public class DistributedFlowRuleStore
222 237
223 @Deactivate 238 @Deactivate
224 public void deactivate() { 239 public void deactivate() {
240 + clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
241 + clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
242 + clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
243 + clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
225 replicaInfoManager.removeListener(replicaInfoEventListener); 244 replicaInfoManager.removeListener(replicaInfoEventListener);
226 log.info("Stopped"); 245 log.info("Stopped");
227 } 246 }
...@@ -507,9 +526,21 @@ public class DistributedFlowRuleStore ...@@ -507,9 +526,21 @@ public class DistributedFlowRuleStore
507 return removeFlowRuleInternal(rule); 526 return removeFlowRuleInternal(rule);
508 } 527 }
509 528
510 - log.warn("Tried to remove FlowRule {}," 529 + log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
511 - + " while the Node was not the master.", rule); 530 + replicaInfo.master().orNull(), rule.deviceId());
512 - return null; 531 +
532 + ClusterMessage message = new ClusterMessage(
533 + clusterService.getLocalNode().id(),
534 + REMOVE_FLOW_ENTRY,
535 + SERIALIZER.encode(rule));
536 +
537 + try {
538 + Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
539 + return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
540 + } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
541 + // FIXME: throw a FlowStoreException
542 + throw new RuntimeException(e);
543 + }
513 } 544 }
514 545
515 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { 546 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
......
...@@ -31,4 +31,7 @@ public final class FlowStoreMessageSubjects { ...@@ -31,4 +31,7 @@ public final class FlowStoreMessageSubjects {
31 31
32 public static final MessageSubject GET_DEVICE_FLOW_ENTRIES 32 public static final MessageSubject GET_DEVICE_FLOW_ENTRIES
33 = new MessageSubject("peer-forward-get-device-flow-entries"); 33 = new MessageSubject("peer-forward-get-device-flow-entries");
34 +
35 + public static final MessageSubject REMOVE_FLOW_ENTRY
36 + = new MessageSubject("peer-forward-remove-flow-entry");
34 } 37 }
......