Yuta HIGUCHI

DistributedFlowRuleStore: remote batch support

Change-Id: I373a942697624440e025a8022a13394396058a71
......@@ -30,7 +30,7 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T
* @param request batch operation request.
* @return event.
*/
public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
return event;
}
......@@ -41,7 +41,7 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T
* @param result completed batch operation result.
* @return event.
*/
public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
public static FlowRuleBatchEvent completed(FlowRuleBatchRequest request, CompletedBatchOperation result) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
return event;
}
......
......@@ -9,10 +9,12 @@ import com.google.common.collect.Lists;
public class FlowRuleBatchRequest {
private final int batchId;
private final List<FlowEntry> toAdd;
private final List<FlowEntry> toRemove;
public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
public FlowRuleBatchRequest(int batchId, List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
......@@ -35,4 +37,8 @@ public class FlowRuleBatchRequest {
}
return new FlowRuleBatchOperation(entries);
}
public int batchId() {
return batchId;
}
}
......
......@@ -2,12 +2,14 @@ package org.onlab.onos.net.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.util.Tools.namedThreads;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
......@@ -74,6 +76,8 @@ public class FlowRuleManager
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
private final ExecutorService futureListeners = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
......@@ -92,6 +96,8 @@ public class FlowRuleManager
@Deactivate
public void deactivate() {
futureListeners.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
......@@ -398,9 +404,9 @@ public class FlowRuleManager
result.addListener(new Runnable() {
@Override
public void run() {
store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, Futures.getUnchecked(result)));
}
}, Executors.newCachedThreadPool());
}, futureListeners);
break;
case BATCH_OPERATION_COMPLETED:
......
......@@ -3,15 +3,20 @@ package org.onlab.onos.store.flow.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
import static org.onlab.util.Tools.namedThreads;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import org.apache.felix.scr.annotations.Activate;
......@@ -22,7 +27,9 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
......@@ -52,8 +59,12 @@ import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
/**
* Manages inventory of flow rules using a distributed state management protocol.
......@@ -74,13 +85,26 @@ public class DistributedFlowRuleStore
ArrayListMultimap.<Short, FlowRule>create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ReplicaInfoService replicaInfoManager;
protected ReplicaInfoService replicaInfoManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final AtomicInteger localBatchIdGen = new AtomicInteger();
// FIXME switch to expiraing map/Cache?
private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
private final ExecutorService futureListeners =
Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
......@@ -97,36 +121,26 @@ public class DistributedFlowRuleStore
@Activate
public void activate() {
clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received add request for {}", rule);
storeFlowRule(rule);
// FIXME what to respond.
try {
message.respond(SERIALIZER.encode("ACK"));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
});
public void handle(final ClusterMessage message) {
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
log.info("received batch request {}", operation);
final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
f.addListener(new Runnable() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.info("received delete request for {}", rule);
deleteFlowRule(rule);
// FIXME what to respond.
public void run() {
CompletedBatchOperation result = Futures.getUnchecked(f);
try {
message.respond(SERIALIZER.encode("ACK"));
message.respond(SERIALIZER.encode(result));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
}, futureListeners);
}
});
......@@ -159,7 +173,13 @@ public class DistributedFlowRuleStore
// make it device specific.
@Override
public int getFlowRuleCount() {
return flowEntries.size();
// implementing in-efficient operation for debugging purpose.
int sum = 0;
for (Device device : deviceService.getDevices()) {
final DeviceId did = device.id();
sum += Iterables.size(getFlowEntries(did));
}
return sum;
}
@Override
......@@ -218,6 +238,7 @@ public class DistributedFlowRuleStore
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
}
@Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
......@@ -236,7 +257,7 @@ public class DistributedFlowRuleStore
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.STORE_FLOW_RULE,
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
try {
......@@ -250,7 +271,7 @@ public class DistributedFlowRuleStore
return null;
}
private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
List<FlowEntry> toRemove = new ArrayList<>();
List<FlowEntry> toAdd = new ArrayList<>();
// TODO: backup changes to hazelcast map
......@@ -261,8 +282,8 @@ public class DistributedFlowRuleStore
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
}
toRemove.add(entry);
}
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
......@@ -276,9 +297,13 @@ public class DistributedFlowRuleStore
if (toAdd.isEmpty() && toRemove.isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
// TODO: imlpement this.
return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
pendingFutures.put(batchId, r);
notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
return r;
}
@Override
......@@ -293,18 +318,9 @@ public class DistributedFlowRuleStore
return addOrUpdateFlowRuleInternal(rule);
}
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
SERIALIZER.encode(rule));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
log.error("Tried to update FlowRule {} state,"
+ " while the Node was not the master.", rule);
return null;
}
private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
......@@ -338,18 +354,9 @@ public class DistributedFlowRuleStore
return removeFlowRuleInternal(rule);
}
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
SERIALIZER.encode(rule));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
log.error("Tried to remove FlowRule {},"
+ " while the Node was not the master.", rule);
return null;
}
private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
......@@ -364,6 +371,11 @@ public class DistributedFlowRuleStore
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
SettableFuture<CompletedBatchOperation> future
= pendingFutures.get(event.subject().batchId());
if (future != null) {
future.set(event.result());
}
notifyDelegate(event);
}
}
......
......@@ -7,10 +7,10 @@ import org.onlab.onos.store.cluster.messaging.MessageSubject;
*/
public final class FlowStoreMessageSubjects {
private FlowStoreMessageSubjects() {}
public static final MessageSubject STORE_FLOW_RULE = new MessageSubject("peer-forward-store-flow-rule");
public static final MessageSubject DELETE_FLOW_RULE = new MessageSubject("peer-forward-delete-flow-rule");
public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
new MessageSubject("peer-forward-add-or-update-flow-rule");
public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
public static final MessageSubject APPLY_BATCH_FLOWS
= new MessageSubject("peer-forward-apply-batch");
public static final MessageSubject GET_FLOW_ENTRY
= new MessageSubject("peer-forward-get-flow-entry");
}
......
......@@ -58,7 +58,6 @@ implements MastershipStore {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
@Activate
public void activate() {
......@@ -76,9 +75,9 @@ implements MastershipStore {
}
};
roleMap = new SMap(theInstance.getMap("nodeRoles"), this.serializer);
roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap("nodeRoles"), this.serializer);
roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
terms = new SMap(theInstance.getMap("terms"), this.serializer);
terms = new SMap<>(theInstance.<byte[], byte[]>getMap("terms"), this.serializer);
clusterSize = theInstance.getAtomicLong("clustersize");
log.info("Started");
......
......@@ -5,6 +5,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
......@@ -27,12 +28,15 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criterion;
......@@ -80,6 +84,7 @@ public final class KryoNamespaces {
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
LinkedList.class,
//
//
ControllerNode.State.class,
......@@ -118,7 +123,11 @@ public final class KryoNamespaces {
DefaultTrafficTreatment.class,
Instructions.DropInstruction.class,
Instructions.OutputInstruction.class,
RoleInfo.class
RoleInfo.class,
FlowRuleBatchOperation.class,
CompletedBatchOperation.class,
FlowRuleBatchEntry.class,
FlowRuleBatchEntry.FlowRuleOperation.class
)
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
......
......@@ -176,8 +176,8 @@ public class SimpleFlowRuleStore
}
// new flow rule added
existing.add(f);
notifyDelegate(FlowRuleBatchEvent.create(
new FlowRuleBatchRequest(
notifyDelegate(FlowRuleBatchEvent.requested(
new FlowRuleBatchRequest( 1, /* FIXME generate something */
Arrays.<FlowEntry>asList(f),
Collections.<FlowEntry>emptyList())));
}
......@@ -194,8 +194,8 @@ public class SimpleFlowRuleStore
synchronized (entry) {
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: Should we notify only if it's "remote" event?
notifyDelegate(FlowRuleBatchEvent.create(
new FlowRuleBatchRequest(
notifyDelegate(FlowRuleBatchEvent.requested(
new FlowRuleBatchRequest(1, /* FIXME generate something */
Collections.<FlowEntry>emptyList(),
Arrays.<FlowEntry>asList(entry))));
}
......