Brian O'Connor
Committed by Gerrit Code Review

Implementation of new Flow Subsystem:

The subsystem no longer returns futures for tracking completion of work.
Notifications are explicitely returned via a call back mechanism. Also, the
subsystem is now asynchronous.

Change-Id: I1a4cef931c24820f9ae9ed9a5398f163f05dfbc9

more flowservice improvements

Change-Id: I5c9c1b6be4b2ebfa523b64f6f52e7634b7d3e05f

more flowservice impl

Change-Id: I05f6774460effb53ced8c36844bcda2f8f6c096f

Manager to store functional (at least i believe it)

Change-Id: I09b04989bd1004c98fe0bafed4c76714b9155d53

flow subsystem functional: need to fix unit tests

Change-Id: I1667f25b91320f625a03e5e1d5e92823184d9de0

flow subsystem functional

Change-Id: I429b3335c16d4fc16f5d55f233dd37c4d1d6111d

finished refactor of flow subsystem

Change-Id: I1899abc6ff6a974a2018d936cc555049c70a6804

fix for null flow provider to use new api

Change-Id: If2fd9bd5baf74d9c61c5c8085cef8bc2d204cbdc
Showing 34 changed files with 1324 additions and 1043 deletions
......@@ -19,29 +19,30 @@ package org.onosproject.cli.net;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.packet.MacAddress;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onlab.packet.MacAddress;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Installs many many flows.
......@@ -50,6 +51,8 @@ import java.util.concurrent.Future;
description = "Installs a number of test flow rules - for testing only")
public class AddFlowsCommand extends AbstractShellCommand {
private CountDownLatch latch;
@Argument(index = 0, name = "flowPerDevice", description = "Number of flows to add per device",
required = true, multiValued = false)
String flows = null;
......@@ -63,6 +66,9 @@ public class AddFlowsCommand extends AbstractShellCommand {
FlowRuleService flowService = get(FlowRuleService.class);
DeviceService deviceService = get(DeviceService.class);
CoreService coreService = get(CoreService.class);
ApplicationId appId = coreService.registerApplication("onos.test.flow.installer");
int flowsPerDevice = Integer.parseInt(flows);
int num = Integer.parseInt(numOfRuns);
......@@ -70,49 +76,73 @@ public class AddFlowsCommand extends AbstractShellCommand {
ArrayList<Long> results = Lists.newArrayList();
Iterable<Device> devices = deviceService.getDevices();
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(PortNumber.portNumber(1)).build();
.setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
TrafficSelector.Builder sbuilder;
Set<FlowRuleBatchEntry> rules = Sets.newHashSet();
Set<FlowRuleBatchEntry> remove = Sets.newHashSet();
FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
for (Device d : devices) {
for (int i = 0; i < flowsPerDevice; i++) {
sbuilder = DefaultTrafficSelector.builder();
sbuilder.matchEthSrc(MacAddress.valueOf(i))
.matchEthDst(MacAddress.valueOf(Integer.MAX_VALUE - i));
rules.add(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.ADD,
new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
100, (long) 0, 10, false)));
remove.add(new FlowRuleBatchEntry(FlowRuleBatchEntry.FlowRuleOperation.REMOVE,
new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
100, (long) 0, 10, false)));
sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
.matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
int randomPriority = RandomUtils.nextInt();
rules.add(new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
randomPriority, appId, 10, false));
remove.remove(new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
randomPriority, appId, 10, false));
}
}
boolean isSuccess = true;
for (int i = 0; i < num; i++) {
long startTime = System.currentTimeMillis();
Future<CompletedBatchOperation> op = flowService.applyBatch(
new FlowRuleBatchOperation(rules));
latch = new CountDownLatch(2);
flowService.apply(rules.build(new FlowRuleOperationsContext() {
private final Stopwatch timer = Stopwatch.createStarted();
@Override
public void onSuccess(FlowRuleOperations ops) {
timer.stop();
results.add(timer.elapsed(TimeUnit.MILLISECONDS));
if (results.size() == num) {
if (outputJson()) {
print("%s", json(new ObjectMapper(), true, results));
} else {
printTime(true, results);
}
}
latch.countDown();
}
}));
flowService.apply(remove.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
latch.countDown();
}
}));
try {
isSuccess &= op.get().isSuccess();
} catch (InterruptedException | ExecutionException e) {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
results.add(endTime - startTime);
flowService.applyBatch(
new FlowRuleBatchOperation(remove));
}
if (outputJson()) {
print("%s", json(new ObjectMapper(), isSuccess, results));
} else {
printTime(isSuccess, results);
}
}
private Object json(ObjectMapper mapper, boolean isSuccess, ArrayList<Long> elapsed) {
ObjectNode result = mapper.createObjectNode();
result.put("Success", isSuccess);
......
......@@ -21,6 +21,7 @@ import java.util.Set;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import org.onosproject.net.DeviceId;
/**
* Representation of a completed flow rule batch operation.
......@@ -30,19 +31,22 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
private final boolean success;
private final Set<FlowRule> failures;
private final Set<Long> failedIds;
private final DeviceId deviceId;
/**
* Creates a new batch completion result.
*
* @param success indicates whether the completion is successful.
* @param success indicates whether the completion is successful
* @param failures set of any failures encountered
* @param failedIds (optional) set of failed operation ids
* @param deviceId the device this operation completed for
*/
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
Set<Long> failedIds) {
Set<Long> failedIds, DeviceId deviceId) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = ImmutableSet.copyOf(failedIds);
this.deviceId = deviceId;
}
/**
......@@ -51,10 +55,12 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
* @param success indicates whether the completion is successful.
* @param failures set of any failures encountered
*/
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
DeviceId deviceId) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = Collections.emptySet();
this.deviceId = deviceId;
}
......@@ -73,12 +79,17 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
return failedIds;
}
public DeviceId deviceId() {
return this.deviceId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("success?", success)
.add("failedItems", failures)
.add("failedIds", failedIds)
.add("deviceId", deviceId)
.toString();
}
}
......
......@@ -16,12 +16,14 @@
package org.onosproject.net.flow;
import org.onosproject.event.AbstractEvent;
import org.onosproject.net.DeviceId;
/**
* Describes flow rule batch event.
*/
public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
/**
* Type of flow rule events.
*/
......@@ -42,14 +44,17 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T
}
private final CompletedBatchOperation result;
private final DeviceId deviceId;
/**
* Constructs a new FlowRuleBatchEvent.
* @param request batch operation request.
*
* @param request batch operation request
* @param deviceId the device this batch will be processed on
* @return event.
*/
public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
public static FlowRuleBatchEvent requested(FlowRuleBatchRequest request, DeviceId deviceId) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, deviceId);
return event;
}
......@@ -73,13 +78,36 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T
}
/**
* Returns the deviceId for this batch.
* @return device id
*/
public DeviceId deviceId() {
return deviceId;
}
/**
* Creates an event of a given type and for the specified flow rule batch.
*
* @param type flow rule batch event type
* @param batch event flow rule batch subject
* @param request event flow rule batch subject
* @param result the result of the batch operation
*/
private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
super(type, request);
this.result = result;
this.deviceId = result.deviceId();
}
/**
* Creates an event of a given type and for the specified flow rule batch.
*
* @param type flow rule batch event type
* @param request event flow rule batch subject
* @param deviceId the device id for this batch
*/
private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, DeviceId deviceId) {
super(type, request);
this.result = null;
this.deviceId = deviceId;
}
}
......
......@@ -15,12 +15,37 @@
*/
package org.onosproject.net.flow;
import org.onosproject.net.DeviceId;
import java.util.Collection;
/**
* Class used with the flow subsystem to process per device
* batches.
*/
public class FlowRuleBatchOperation
extends BatchOperation<FlowRuleBatchEntry> {
public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
/**
* This id is used to cary to id of the original
* FlowOperations and track where this batch operation
* came from. The id is unique cluster wide.
*/
private final long id;
private final DeviceId deviceId;
public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations,
DeviceId deviceId, long flowOperationId) {
super(operations);
this.id = flowOperationId;
this.deviceId = deviceId;
}
public DeviceId deviceId() {
return this.deviceId;
}
public long id() {
return id;
}
}
......
......@@ -15,59 +15,43 @@
*/
package org.onosproject.net.flow;
import com.google.common.collect.Lists;
import org.onosproject.net.DeviceId;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
public class FlowRuleBatchRequest {
/**
* This id is used to cary to id of the original
* FlowOperations and track where this batch operation
* came from. The id is unique cluster wide.
*/
private final long batchId;
private final Set<FlowRuleBatchEntry> ops;
import com.google.common.collect.Lists;
public class FlowRuleBatchRequest {
private final int batchId;
private final List<FlowRuleBatchEntry> toAdd;
private final List<FlowRuleBatchEntry> toRemove;
public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
List<FlowRuleBatchEntry> toRemove) {
public FlowRuleBatchRequest(long batchId, Set<FlowRuleBatchEntry> ops) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
this.ops = Collections.unmodifiableSet(ops);
public List<FlowRule> toAdd() {
return FluentIterable.from(toAdd).transform(
new Function<FlowRuleBatchEntry, FlowRule>() {
@Override
public FlowRule apply(FlowRuleBatchEntry input) {
return input.target();
}
}).toList();
}
public List<FlowRule> toRemove() {
return FluentIterable.from(toRemove).transform(
new Function<FlowRuleBatchEntry, FlowRule>() {
@Override
public FlowRule apply(FlowRuleBatchEntry input) {
return input.target();
}
}).toList();
public Set<FlowRuleBatchEntry> ops() {
return ops;
}
public FlowRuleBatchOperation asBatchOperation() {
public FlowRuleBatchOperation asBatchOperation(DeviceId deviceId) {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
entries.addAll(toAdd);
entries.addAll(toRemove);
return new FlowRuleBatchOperation(entries);
entries.addAll(ops);
return new FlowRuleBatchOperation(entries, deviceId, batchId);
}
public int batchId() {
public long batchId() {
return batchId;
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flow;
import com.google.common.base.MoreObjects;
/**
* Representation of an operation on a flow rule table.
*/
public class FlowRuleOperation {
/**
* Type of flow table operations.
*/
public enum Type {
ADD,
MODIFY,
REMOVE
}
private final FlowRule rule;
private final Type type;
public FlowRuleOperation(FlowRule rule, Type type) {
this.rule = rule;
this.type = type;
}
/**
* Returns the type of operation.
*
* @return type
*/
public Type type() {
return type;
}
/**
* Returns the flow rule.
*
* @return flow rule
*/
public FlowRule rule() {
return rule;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("rule", rule)
.add("type", type)
.toString();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flow;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
/**
* A batch of flow rule operations that are broken into stages.
* TODO move this up to parent's package
*/
public class FlowRuleOperations {
private final List<Set<FlowRuleOperation>> stages;
private final FlowRuleOperationsContext callback; // TODO consider Optional
private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
FlowRuleOperationsContext cb) {
this.stages = stages;
this.callback = cb;
}
// kryo-constructor
protected FlowRuleOperations() {
this.stages = Lists.newArrayList();
this.callback = null;
}
/**
* Returns the flow rule operations as sets of stages that should be
* executed sequentially.
*
* @return flow rule stages
*/
public List<Set<FlowRuleOperation>> stages() {
return stages;
}
/**
* Returns the callback for this batch of operations.
*
* @return callback
*/
public FlowRuleOperationsContext callback() {
return callback;
}
/**
* Returns a new builder.
*
* @return new builder
*/
public static Builder builder() {
return new Builder();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("stages", stages)
.toString();
}
/**
* A builder for constructing flow rule operations.
*/
public static final class Builder {
private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
// prevent use of the default constructor outside of this file; use the above method
private Builder() {}
/**
* Appends a flow rule add to the current stage.
*
* @param flowRule flow rule
* @return this
*/
public Builder add(FlowRule flowRule) {
currentStage.add(new FlowRuleOperation(flowRule, ADD));
return this;
}
/**
* Appends a flow rule modify to the current stage.
*
* @param flowRule flow rule
* @return this
*/
public Builder modify(FlowRule flowRule) {
currentStage.add(new FlowRuleOperation(flowRule, MODIFY));
return this;
}
/**
* Appends a flow rule remove to the current stage.
*
* @param flowRule flow rule
* @return this
*/
// FIXME this is confusing, consider renaming
public Builder remove(FlowRule flowRule) {
currentStage.add(new FlowRuleOperation(flowRule, REMOVE));
return this;
}
/**
* Closes the current stage.
*/
private void closeStage() {
ImmutableSet<FlowRuleOperation> stage = currentStage.build();
if (!stage.isEmpty()) {
listBuilder.add(stage);
}
}
/**
* Closes the current stage and starts a new one.
*
* @return this
*/
public Builder newStage() {
closeStage();
currentStage = ImmutableSet.builder();
return this;
}
/**
* Builds the immutable flow rule operations.
*
* @return flow rule operations
*/
public FlowRuleOperations build() {
return build(null);
}
/**
* Builds the immutable flow rule operations.
*
* @param cb the callback to call when this operation completes
* @return flow rule operations
*/
public FlowRuleOperations build(FlowRuleOperationsContext cb) {
closeStage();
return new FlowRuleOperations(listBuilder.build(), cb);
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.flow;
/**
* The context of a flow rule operations that will become the subject of
* the notification.
*
* Implementations of this class must be serializable.
*/
public interface FlowRuleOperationsContext {
// TODO we might also want to execute a method on behalf of the app
default void onSuccess(FlowRuleOperations ops){}
default void onError(FlowRuleOperations ops){}
}
......@@ -18,8 +18,6 @@ package org.onosproject.net.flow;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.provider.Provider;
import java.util.concurrent.Future;
/**
* Abstraction of a flow rule provider.
*/
......@@ -56,8 +54,7 @@ public interface FlowRuleProvider extends Provider {
* Installs a batch of flow rules. Each flowrule is associated to an
* operation which results in either addition, removal or modification.
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
void executeBatch(FlowRuleBatchOperation batch);
}
......
......@@ -40,4 +40,13 @@ public interface FlowRuleProviderService extends ProviderService<FlowRuleProvide
*/
void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
/**
* Indicates to the core that the requested batch operation has
* been completed.
*
* @param batchId the batch which was processed
* @param operation the resulting outcome of the operation
*/
void batchOperationCompleted(long batchId, CompletedBatchOperation operation);
}
......
......@@ -15,11 +15,11 @@
*/
package org.onosproject.net.flow;
import java.util.concurrent.Future;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import java.util.concurrent.Future;
/**
* Service for injecting flow rules into the environment and for obtaining
* information about flow rules already in the environment. This implements
......@@ -30,6 +30,11 @@ import org.onosproject.net.DeviceId;
public interface FlowRuleService {
/**
* The topic used for obtaining globally unique ids.
*/
static String FLOW_OP_TOPIC = "flow-ops-ids";
/**
* Returns the number of flow rules in the system.
*
* @return flow rule count
......@@ -96,11 +101,20 @@ public interface FlowRuleService {
* Applies a batch operation of FlowRules.
*
* @param batch batch operation to apply
* @return future indicating the state of the batch operation
* @return future indicating the state of the batch operation, due to the
* deprecation of this api the future will immediately return
*/
@Deprecated
Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
* Applies a batch operation of FlowRules.
*
* @param ops batch operation to apply
*/
void apply(FlowRuleOperations ops);
/**
* Adds the specified flow rule listener.
*
* @param listener flow rule listener
......
......@@ -15,8 +15,6 @@
*/
package org.onosproject.net.flow;
import java.util.concurrent.Future;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
......@@ -54,6 +52,7 @@ public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDe
*
* @param rule the flow rule to add
*/
@Deprecated
void storeFlowRule(FlowRule rule);
/**
......@@ -61,10 +60,9 @@ public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDe
*
* @param batchOperation batch of flow rules.
* A batch can contain flow rules for a single device only.
* @return Future response indicating success/failure of the batch operation
* all the way down to the device.
*
*/
Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
void storeBatch(FlowRuleBatchOperation batchOperation);
/**
* Invoked on the completion of a storeBatch operation.
......
......@@ -46,10 +46,10 @@ public class FlowRuleBatchOperationTest {
final LinkedList<FlowRuleBatchEntry> ops3 = new LinkedList<>();
ops3.add(entry3);
final FlowRuleBatchOperation operation1 = new FlowRuleBatchOperation(ops1);
final FlowRuleBatchOperation sameAsOperation1 = new FlowRuleBatchOperation(ops1);
final FlowRuleBatchOperation operation2 = new FlowRuleBatchOperation(ops2);
final FlowRuleBatchOperation operation3 = new FlowRuleBatchOperation(ops3);
final FlowRuleBatchOperation operation1 = new FlowRuleBatchOperation(ops1, null, 0);
final FlowRuleBatchOperation sameAsOperation1 = new FlowRuleBatchOperation(ops1, null, 0);
final FlowRuleBatchOperation operation2 = new FlowRuleBatchOperation(ops2, null, 0);
final FlowRuleBatchOperation operation3 = new FlowRuleBatchOperation(ops3, null, 0);
new EqualsTester()
.addEqualityGroup(operation1, sameAsOperation1)
......
......@@ -15,17 +15,18 @@
*/
package org.onosproject.net.flow;
import java.util.LinkedList;
import java.util.List;
import org.junit.Test;
import org.onosproject.net.intent.IntentTestsMocks;
import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.ADD;
import static org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation.REMOVE;
/**
* Unit tests for the FlowRuleBatchRequest class.
......@@ -40,22 +41,19 @@ public class FlowRuleBatchRequestTest {
public void testConstruction() {
final FlowRule rule1 = new IntentTestsMocks.MockFlowRule(1);
final FlowRule rule2 = new IntentTestsMocks.MockFlowRule(2);
final List<FlowRuleBatchEntry> toAdd = new LinkedList<>();
toAdd.add(new FlowRuleBatchEntry(ADD, rule1));
final List<FlowRuleBatchEntry> toRemove = new LinkedList<>();
toRemove.add(new FlowRuleBatchEntry(REMOVE, rule2));
final Set<FlowRuleBatchEntry> batch = new HashSet<>();
batch.add(new FlowRuleBatchEntry(ADD, rule1));
batch.add(new FlowRuleBatchEntry(REMOVE, rule2));
final FlowRuleBatchRequest request =
new FlowRuleBatchRequest(1, toAdd, toRemove);
new FlowRuleBatchRequest(1, batch);
assertThat(request.toAdd(), hasSize(1));
assertThat(request.toAdd().get(0), is(rule1));
assertThat(request.toRemove(), hasSize(1));
assertThat(request.toRemove().get(0), is(rule2));
assertThat(request.batchId(), is(1));
assertThat(request.ops(), hasSize(2));
assertThat(request.batchId(), is(1L));
final FlowRuleBatchOperation op = request.asBatchOperation();
final FlowRuleBatchOperation op = request.asBatchOperation(rule1.deviceId());
assertThat(op.size(), is(2));
final List<FlowRuleBatchEntry> ops = op.getOperations();
......
......@@ -66,6 +66,11 @@ public class FlowRuleServiceAdapter implements FlowRuleService {
}
@Override
public void apply(FlowRuleOperations ops) {
}
@Override
public void addListener(FlowRuleListener listener) {
}
......
......@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -29,22 +29,25 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
import org.onosproject.net.flow.FlowRuleOperation;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
......@@ -55,18 +58,16 @@ import org.onosproject.net.provider.AbstractProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -90,7 +91,16 @@ public class FlowRuleManager
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
private ExecutorService futureService;
protected ExecutorService deviceInstallers =
Executors.newCachedThreadPool(namedThreads("onos-device-installer-%d"));
protected ExecutorService operationsService =
Executors.newFixedThreadPool(32, namedThreads("onos-flowservice-operations-%d"));
private IdGenerator idGenerator;
private Map<Long, FlowOperationsProcessor> pendingFlowOperations = new
ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
......@@ -101,10 +111,15 @@ public class FlowRuleManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Activate
public void activate() {
futureService =
Executors.newFixedThreadPool(32, namedThreads("onos-provider-future-listeners-%d"));
idGenerator = coreService.getIdGenerator(FLOW_OP_TOPIC);
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
......@@ -112,8 +127,8 @@ public class FlowRuleManager
@Deactivate
public void deactivate() {
futureService.shutdownNow();
deviceInstallers.shutdownNow();
operationsService.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
log.info("Stopped");
......@@ -131,20 +146,20 @@ public class FlowRuleManager
@Override
public void applyFlowRules(FlowRule... flowRules) {
Set<FlowRuleBatchEntry> toAddBatchEntries = Sets.newHashSet();
FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
toAddBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, flowRules[i]));
builder.add(flowRules[i]);
}
applyBatch(new FlowRuleBatchOperation(toAddBatchEntries));
apply(builder.build());
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
Set<FlowRuleBatchEntry> toRemoveBatchEntries = Sets.newHashSet();
FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
for (int i = 0; i < flowRules.length; i++) {
toRemoveBatchEntries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, flowRules[i]));
builder.remove(flowRules[i]);
}
applyBatch(new FlowRuleBatchOperation(toRemoveBatchEntries));
apply(builder.build());
}
@Override
......@@ -180,23 +195,38 @@ public class FlowRuleManager
}
@Override
public Future<CompletedBatchOperation> applyBatch(
FlowRuleBatchOperation batch) {
Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
ArrayListMultimap.create();
List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.target();
perDeviceBatches.put(f.deviceId(), fbe);
}
public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
FlowRuleOperations.Builder fopsBuilder = FlowRuleOperations.builder();
batch.getOperations().stream().forEach(op -> {
switch (op.getOperator()) {
case ADD:
fopsBuilder.add(op.getTarget());
break;
case REMOVE:
fopsBuilder.remove(op.getTarget());
break;
case MODIFY:
fopsBuilder.modify(op.getTarget());
break;
default:
log.warn("Unknown flow operation operator: {}", op.getOperator());
for (DeviceId deviceId : perDeviceBatches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
Future<CompletedBatchOperation> future = store.storeBatch(b);
futures.add(future);
}
return new FlowRuleBatchFuture(futures, perDeviceBatches);
}
}
);
apply(fopsBuilder.build());
return Futures.immediateFuture(
new CompletedBatchOperation(true,
Collections.emptySet(), null));
}
@Override
public void apply(FlowRuleOperations ops) {
operationsService.submit(new FlowOperationsProcessor(ops));
}
@Override
......@@ -373,13 +403,19 @@ public class FlowRuleManager
}
}
@Override
public void batchOperationCompleted(long batchId, CompletedBatchOperation operation) {
store.batchOperationComplete(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(batchId, Collections.emptySet()),
operation
));
}
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
// FIXME set appropriate default and make it configurable
private static final int TIMEOUT_PER_OP = 500; // ms
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
......@@ -389,47 +425,55 @@ public class FlowRuleManager
switch (event.type()) {
case BATCH_OPERATION_REQUESTED:
// Request has been forwarded to MASTER Node, and was
for (FlowRule entry : request.toAdd()) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
}
for (FlowRule entry : request.toRemove()) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
}
// FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
request.ops().stream().forEach(
op -> {
switch (op.getOperator()) {
case ADD:
eventDispatcher.post(
new FlowRuleEvent(
FlowRuleEvent.Type.RULE_ADD_REQUESTED,
op.getTarget()));
break;
case REMOVE:
eventDispatcher.post(
new FlowRuleEvent(
FlowRuleEvent.Type.RULE_REMOVE_REQUESTED,
op.getTarget()));
break;
case MODIFY:
//TODO: do something here when the time comes.
break;
default:
log.warn("Unknown flow operation operator: {}", op.getOperator());
}
}
);
FlowRuleBatchOperation batchOperation = request.asBatchOperation();
DeviceId deviceId = event.deviceId();
FlowRuleBatchOperation batchOperation =
request.asBatchOperation(deviceId);
FlowRuleProvider flowRuleProvider =
getProvider(batchOperation.getOperations().get(0).target().deviceId());
final Future<CompletedBatchOperation> result =
flowRuleProvider.executeBatch(batchOperation);
futureService.submit(new Runnable() {
@Override
public void run() {
CompletedBatchOperation res;
try {
res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.warn("Something went wrong with the batch operation {}",
request.batchId(), e);
Set<FlowRule> failures = new HashSet<>(batchOperation.size());
for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
failures.add(op.target());
}
res = new CompletedBatchOperation(false, failures);
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
}
}
});
getProvider(deviceId);
flowRuleProvider.executeBatch(batchOperation);
break;
case BATCH_OPERATION_COMPLETED:
// MASTER Node has pushed the batch down to the Device
// Note: RULE_ADDED will be posted
// when Flow was actually confirmed by stats reply.
FlowOperationsProcessor fops = pendingFlowOperations.remove(
event.subject().batchId());
if (event.result().isSuccess()) {
if (fops != null) {
fops.satisfy(event.deviceId());
}
} else {
fops.fail(event.deviceId(), event.result().failedItems());
}
break;
default:
......@@ -438,141 +482,100 @@ public class FlowRuleManager
}
}
private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
private class FlowOperationsProcessor implements Runnable {
private final List<Future<CompletedBatchOperation>> futures;
private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
private final AtomicReference<BatchState> state;
private CompletedBatchOperation overall;
private final List<Set<FlowRuleOperation>> stages;
private final FlowRuleOperationsContext context;
private final FlowRuleOperations fops;
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Multimap<DeviceId, FlowRuleBatchEntry> batches) {
this.futures = futures;
this.batches = batches;
this.state = new AtomicReference<>(BatchState.STARTED);
}
private Set<DeviceId> pendingDevices;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (state.get() == BatchState.FINISHED) {
return false;
}
if (log.isDebugEnabled()) {
log.debug("Cancelling FlowRuleBatchFuture",
new RuntimeException("Just printing backtrace"));
}
if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
return false;
}
cleanUpBatch();
for (Future<CompletedBatchOperation> f : futures) {
f.cancel(mayInterruptIfRunning);
}
return true;
}
public FlowOperationsProcessor(FlowRuleOperations ops) {
@Override
public boolean isCancelled() {
return state.get() == BatchState.CANCELLED;
}
this.stages = Lists.newArrayList(ops.stages());
this.context = ops.callback();
this.fops = ops;
pendingDevices = Sets.newConcurrentHashSet();
@Override
public boolean isDone() {
return state.get() == BatchState.FINISHED;
}
}
@Override
public CompletedBatchOperation get() throws InterruptedException,
ExecutionException {
if (isDone()) {
return overall;
public void run() {
if (stages.size() > 0) {
process(stages.remove(0));
} else if (!hasFailed.get() && context != null) {
context.onSuccess(fops);
}
}
boolean success = true;
Set<FlowRule> failed = Sets.newHashSet();
Set<Long> failedIds = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
success = validateBatchOperation(failed, failedIds, completed);
private void process(Set<FlowRuleOperation> ops) {
Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
ArrayListMultimap.create();
FlowRuleBatchEntry fbe;
for (FlowRuleOperation flowRuleOperation : ops) {
switch (flowRuleOperation.type()) {
// FIXME: Brian needs imagination when creating class names.
case ADD:
fbe = new FlowRuleBatchEntry(
FlowRuleBatchEntry.FlowRuleOperation.ADD, flowRuleOperation.rule());
break;
case MODIFY:
fbe = new FlowRuleBatchEntry(
FlowRuleBatchEntry.FlowRuleOperation.MODIFY, flowRuleOperation.rule());
break;
case REMOVE:
fbe = new FlowRuleBatchEntry(
FlowRuleBatchEntry.FlowRuleOperation.REMOVE, flowRuleOperation.rule());
break;
default:
throw new UnsupportedOperationException("Unknown flow rule type " + flowRuleOperation.type());
}
pendingDevices.add(flowRuleOperation.rule().deviceId());
perDeviceBatches.put(flowRuleOperation.rule().deviceId(), fbe);
}
return finalizeBatchOperation(success, failed, failedIds);
for (DeviceId deviceId : perDeviceBatches.keySet()) {
Long id = idGenerator.getNewId();
final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
deviceId, id);
pendingFlowOperations.put(id, this);
deviceInstallers.submit(new Runnable() {
@Override
public void run() {
store.storeBatch(b);
}
});
}
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
if (isDone()) {
return overall;
}
boolean success = true;
Set<FlowRule> failed = Sets.newHashSet();
Set<Long> failedIds = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get(timeout, unit);
success = validateBatchOperation(failed, failedIds, completed);
public void satisfy(DeviceId devId) {
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
operationsService.submit(this);
}
return finalizeBatchOperation(success, failed, failedIds);
}
private boolean validateBatchOperation(Set<FlowRule> failed,
Set<Long> failedIds,
CompletedBatchOperation completed) {
if (isCancelled()) {
throw new CancellationException();
}
if (!completed.isSuccess()) {
log.warn("FlowRuleBatch failed: {}", completed);
failed.addAll(completed.failedItems());
failedIds.addAll(completed.failedIds());
cleanUpBatch();
cancelAllSubBatches();
return false;
}
return true;
}
private void cancelAllSubBatches() {
for (Future<CompletedBatchOperation> f : futures) {
f.cancel(true);
public void fail(DeviceId devId, Set<? extends FlowRule> failures) {
hasFailed.set(true);
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
operationsService.submit(this);
}
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
Set<FlowRule> failed,
Set<Long> failedIds) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
return overall;
}
throw new CancellationException();
}
overall = new CompletedBatchOperation(success, failed, failedIds);
return overall;
}
}
if (context != null) {
final FlowRuleOperations.Builder failedOpsBuilder =
FlowRuleOperations.builder();
failures.stream().forEach(failedOpsBuilder::add);
private void cleanUpBatch() {
log.debug("cleaning up batch");
// TODO convert these into a batch?
for (FlowRuleBatchEntry fbe : batches.values()) {
if (fbe.operator() == FlowRuleOperation.ADD ||
fbe.operator() == FlowRuleOperation.MODIFY) {
store.deleteFlowRule(fbe.target());
} else if (fbe.operator() == FlowRuleOperation.REMOVE) {
store.removeFlowRule(new DefaultFlowEntry(fbe.target()));
store.storeFlowRule(fbe.target());
}
context.onError(failedOpsBuilder.build());
}
}
}
}
......
......@@ -1151,6 +1151,7 @@ public class IntentManager
*/
protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
//TODO test this. (also, maybe save this batch)
FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
if (batch.size() > 0) {
//FIXME apply batch might throw an exception
......@@ -1165,7 +1166,7 @@ public class IntentManager
}
private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList(), null, 0);
for (CompletedIntentUpdate update : intentUpdates) {
FlowRuleBatchOperation currentBatch = update.currentBatch();
if (currentBatch != null) {
......
......@@ -98,6 +98,7 @@ public class LinkCollectionIntentInstaller
outputPorts.put(egressPoint.deviceId(), egressPoint.port());
}
//FIXME change to new api
FlowRuleBatchOperation batchOperation =
new FlowRuleBatchOperation(outputPorts
.keys()
......@@ -105,7 +106,7 @@ public class LinkCollectionIntentInstaller
.map(deviceId -> createBatchEntry(operation,
intent, deviceId,
outputPorts.get(deviceId)))
.collect(Collectors.toList()));
.collect(Collectors.toList()), null, 0);
return Collections.singletonList(batchOperation);
}
......
......@@ -181,6 +181,7 @@ public class OpticalPathIntentInstaller implements IntentInstaller<OpticalPathIn
true);
rules.add(new FlowRuleBatchEntry(operation, rule));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
//FIXME change to new api
return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
}
......
......@@ -108,7 +108,8 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
intent.id().fingerprint()));
prev = link.dst();
}
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
//FIXME this should change to new api.
return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
@Override
......@@ -138,7 +139,8 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
intent.id().fingerprint()));
prev = link.dst();
}
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
// FIXME this should change to new api
return Lists.newArrayList(new FlowRuleBatchOperation(rules, null, 0));
}
@Override
......
......@@ -31,7 +31,7 @@ public class TestEventDispatcher extends DefaultEventSinkRegistry
@Override
@SuppressWarnings("unchecked")
public void post(Event event) {
public synchronized void post(Event event) {
EventSink sink = getSink(event.getClass());
checkState(sink != null, "No sink for event %s", event);
sink.process(event);
......
......@@ -20,12 +20,15 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.core.Version;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
......@@ -36,7 +39,6 @@ import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule;
......@@ -72,6 +74,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
import static org.onosproject.net.flow.FlowRuleEvent.Type.*;
......@@ -97,12 +100,16 @@ public class FlowRuleManagerTest {
protected TestListener listener = new TestListener();
private ApplicationId appId;
@Before
public void setUp() {
mgr = new FlowRuleManager();
mgr.store = new SimpleFlowRuleStore();
mgr.eventDispatcher = new TestEventDispatcher();
mgr.deviceService = new TestDeviceService();
mgr.coreService = new TestCoreService();
mgr.operationsService = MoreExecutors.newDirectExecutorService();
mgr.deviceInstallers = MoreExecutors.newDirectExecutorService();
service = mgr;
registry = mgr;
......@@ -246,14 +253,23 @@ public class FlowRuleManagerTest {
@Test
public void flowRemoved() {
FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2);
StoredFlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
providerService.pushFlowMetrics(DID, ImmutableList.of(fe1, fe2));
service.removeFlowRules(f1);
fe1.setState(FlowEntryState.REMOVED);
providerService.flowRemoved(fe1);
validateEvents(RULE_ADD_REQUESTED, RULE_ADD_REQUESTED, RULE_ADDED,
RULE_ADDED, RULE_REMOVE_REQUESTED, RULE_REMOVED);
......@@ -263,11 +279,13 @@ public class FlowRuleManagerTest {
FlowRule f3 = flowRule(3, 3);
FlowEntry fe3 = new DefaultFlowEntry(f3);
service.applyFlowRules(f3);
providerService.pushFlowMetrics(DID, Collections.singletonList(fe3));
validateEvents(RULE_ADD_REQUESTED, RULE_ADDED);
providerService.flowRemoved(fe3);
validateEvents();
}
@Test
......@@ -281,7 +299,6 @@ public class FlowRuleManagerTest {
FlowEntry fe1 = new DefaultFlowEntry(f1);
FlowEntry fe2 = new DefaultFlowEntry(f2);
//FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
//FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
......@@ -388,7 +405,7 @@ public class FlowRuleManagerTest {
FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
Lists.newArrayList(fbe1, fbe2));
Lists.newArrayList(fbe1, fbe2), null, 0);
Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
assertTrue("Entries in wrong state",
validateState(ImmutableMap.of(
......@@ -406,53 +423,6 @@ public class FlowRuleManagerTest {
}
@Test
public void cancelBatch() {
FlowRule f1 = flowRule(1, 1);
FlowRule f2 = flowRule(2, 2);
mgr.applyFlowRules(f1);
assertTrue("Entries in wrong state",
validateState(ImmutableMap.of(
f1, FlowEntryState.PENDING_ADD)));
FlowEntry fe1 = new DefaultFlowEntry(f1);
providerService.pushFlowMetrics(DID, Collections.<FlowEntry>singletonList(fe1));
assertTrue("Entries in wrong state",
validateState(ImmutableMap.of(
f1, FlowEntryState.ADDED)));
FlowRuleBatchEntry fbe1 = new FlowRuleBatchEntry(
FlowRuleBatchEntry.FlowRuleOperation.REMOVE, f1);
FlowRuleBatchEntry fbe2 = new FlowRuleBatchEntry(
FlowRuleBatchEntry.FlowRuleOperation.ADD, f2);
FlowRuleBatchOperation fbo = new FlowRuleBatchOperation(
Lists.newArrayList(fbe1, fbe2));
Future<CompletedBatchOperation> future = mgr.applyBatch(fbo);
future.cancel(true);
assertTrue(flowCount() == 2);
/*
* Rule f1 should be re-added to the list and therefore be in a pending add
* state.
*/
assertTrue("Entries in wrong state",
validateState(ImmutableMap.of(
f2, FlowEntryState.PENDING_REMOVE,
f1, FlowEntryState.PENDING_ADD)));
}
private static class TestListener implements FlowRuleListener {
final List<FlowRuleEvent> events = new ArrayList<>();
......@@ -528,9 +498,8 @@ public class FlowRuleManagerTest {
}
@Override
public ListenableFuture<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
return new TestInstallationFuture();
public void executeBatch(FlowRuleBatchOperation batch) {
// TODO: need to call batchOperationComplete
}
private class TestInstallationFuture
......@@ -554,14 +523,14 @@ public class FlowRuleManagerTest {
@Override
public CompletedBatchOperation get()
throws InterruptedException, ExecutionException {
return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException,
ExecutionException, TimeoutException {
return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet(), null);
}
@Override
......@@ -644,4 +613,37 @@ public class FlowRuleManagerTest {
}
}
private class TestCoreService implements CoreService {
@Override
public Version version() {
return null;
}
@Override
public Set<ApplicationId> getAppIds() {
return null;
}
@Override
public ApplicationId getAppId(Short id) {
return null;
}
@Override
public ApplicationId registerApplication(String identifier) {
return null;
}
@Override
public IdGenerator getIdGenerator(String topic) {
return new IdGenerator() {
private AtomicLong counter = new AtomicLong(0);
@Override
public long getNewId() {
return counter.getAndIncrement();
}
};
}
}
}
......
......@@ -201,7 +201,7 @@ public class IntentManagerTest {
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
@Override
......@@ -209,7 +209,7 @@ public class IntentManagerTest {
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
@Override
......@@ -219,7 +219,7 @@ public class IntentManagerTest {
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
return Lists.newArrayList(new FlowRuleBatchOperation(rules, fr.deviceId(), 0));
}
}
......
......@@ -27,6 +27,7 @@ import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleListener;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleService;
import com.google.common.collect.ImmutableSet;
......@@ -45,11 +46,11 @@ public class MockFlowRuleService implements FlowRuleService {
public void setFuture(boolean success, long intentId) {
if (success) {
future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet(), null));
} else {
final Set<Long> failedIds = ImmutableSet.of(intentId);
future = Futures.immediateFuture(
new CompletedBatchOperation(false, flows, failedIds));
new CompletedBatchOperation(false, flows, failedIds, null));
}
}
......@@ -74,6 +75,11 @@ public class MockFlowRuleService implements FlowRuleService {
}
@Override
public void apply(FlowRuleOperations ops) {
}
@Override
public int getFlowRuleCount() {
return flows.size();
}
......
......@@ -15,33 +15,15 @@
*/
package org.onosproject.store.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.APPLY_BATCH_FLOWS;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.GET_FLOW_ENTRY;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.REMOVE_FLOW_ENTRY;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.hazelcast.core.IMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -49,8 +31,11 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
......@@ -67,6 +52,7 @@ import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleEvent.Type;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
......@@ -79,27 +65,37 @@ import org.onosproject.store.flow.ReplicaInfoEventListener;
import org.onosproject.store.flow.ReplicaInfoService;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
import org.onosproject.store.serializers.DecodeTo;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
import org.slf4j.Logger;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.hazelcast.core.IMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of flow rules using a distributed state management protocol.
......@@ -112,12 +108,10 @@ public class DistributedFlowRuleStore
private final Logger log = getLogger(getClass());
// primary data:
// read/write needs to be locked
private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
// store entries as a pile of rules, no info about device tables
private final Multimap<DeviceId, StoredFlowEntry> flowEntries
= ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
private InternalFlowTable flowTable = new InternalFlowTable();
/*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();*/
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ReplicaInfoService replicaInfoManager;
......@@ -131,23 +125,15 @@ public class DistributedFlowRuleStore
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final AtomicInteger localBatchIdGen = new AtomicInteger();
private int pendingFutureTimeoutMinutes = 5;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
CacheBuilder.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
.removalListener(new TimeoutFuture())
.build();
private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
// Cache of SMaps used for backup data. each SMap contain device flow table
private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
private final ExecutorService futureListeners =
Executors.newCachedThreadPool(namedThreads("onos-flowstore-peer-responders"));
private final ExecutorService backupExecutors =
Executors.newSingleThreadExecutor(namedThreads("onos-async-backups"));
......@@ -169,6 +155,8 @@ public class DistributedFlowRuleStore
private ReplicaInfoEventListener replicaInfoEventListener;
private IdGenerator idGenerator;
@Override
@Activate
public void activate() {
......@@ -176,22 +164,33 @@ public class DistributedFlowRuleStore
super.serializer = SERIALIZER;
super.theInstance = storeService.getHazelcastInstance();
idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
// Cache to create SMap on demand
smaps = CacheBuilder.newBuilder()
.softValues()
.build(new SMapLoader());
.softValues()
.build(new SMapLoader());
final NodeId local = clusterService.getLocalNode().id();
clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
log.trace("received completed notification for {}", event);
notifyDelegate(event);
}
});
clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
@Override
public void handle(ClusterMessage message) {
FlowRule rule = SERIALIZER.decode(message.payload());
log.trace("received get flow entry request for {}", rule);
FlowEntry flowEntry = getFlowEntryInternal(rule);
FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
try {
message.respond(SERIALIZER.encode(flowEntry));
} catch (IOException e) {
......@@ -206,7 +205,7 @@ public class DistributedFlowRuleStore
public void handle(ClusterMessage message) {
DeviceId deviceId = SERIALIZER.decode(message.payload());
log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
try {
message.respond(SERIALIZER.encode(flowEntries));
} catch (IOException e) {
......@@ -272,11 +271,11 @@ public class DistributedFlowRuleStore
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntryInternal(rule);
return flowTable.getFlowEntry(rule);
}
log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), rule.deviceId());
replicaInfo.master().orNull(), rule.deviceId());
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
......@@ -292,19 +291,7 @@ public class DistributedFlowRuleStore
return null;
}
private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
flowEntriesLock.readLock().lock();
try {
for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
}
} finally {
flowEntriesLock.readLock().unlock();
}
return null;
}
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
......@@ -317,11 +304,11 @@ public class DistributedFlowRuleStore
}
if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
return getFlowEntriesInternal(deviceId);
return flowTable.getFlowEntries(deviceId);
}
log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
......@@ -337,30 +324,26 @@ public class DistributedFlowRuleStore
return Collections.emptyList();
}
private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
flowEntriesLock.readLock().lock();
try {
Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptySet();
}
return ImmutableSet.copyOf(rules);
} finally {
flowEntriesLock.readLock().unlock();
}
}
@Override
public void storeFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
storeBatch(new FlowRuleBatchOperation(
Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
rule.deviceId(), idGenerator.getNewId()));
}
@Override
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
public void storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true,
Collections.<FlowRule>emptySet()));
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(true, Collections.emptySet(),
operation.deviceId())));
return;
}
DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
......@@ -369,110 +352,129 @@ public class DistributedFlowRuleStore
if (!replicaInfo.master().isPresent()) {
log.warn("Failed to storeBatch: No master for {}", deviceId);
// TODO: revisit if this should be "success" from Future point of view
// with every FlowEntry failed
return Futures.immediateFailedFuture(new IOException("Failed to storeBatch: No master for " + deviceId));
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.getTarget())
.collect(Collectors.toSet());
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(false, allFailures, operation.deviceId())));
return;
}
final NodeId local = clusterService.getLocalNode().id();
if (replicaInfo.master().get().equals(local)) {
return storeBatchInternal(operation);
storeBatchInternal(operation);
return;
}
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
replicaInfo.master().orNull(), deviceId);
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
local,
APPLY_BATCH_FLOWS,
SERIALIZER.encode(operation));
//CompletedBatchOperation response;
try {
ListenableFuture<byte[]> responseFuture =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
}
/*response =
Futures.transform(responseFuture,
new DecodeTo<CompletedBatchOperation>(SERIALIZER))
.get(500 * operation.size(), TimeUnit.MILLISECONDS);
private ListenableFuture<CompletedBatchOperation>
storeBatchInternal(FlowRuleBatchOperation operation) {
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()), response));*/
final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
DeviceId did = null;
} catch (IOException /*| InterruptedException | ExecutionException | TimeoutException*/ e) {
log.warn("Failed to storeBatch: {}", e.getMessage());
Set<FlowRule> allFailures = operation.getOperations().stream()
.map(op -> op.getTarget())
.collect(Collectors.toSet());
flowEntriesLock.writeLock().lock();
try {
for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
FlowRule flowRule = batchEntry.target();
FlowRuleOperation op = batchEntry.operator();
if (did == null) {
did = flowRule.deviceId();
}
if (op.equals(FlowRuleOperation.REMOVE)) {
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
toRemove.add(batchEntry);
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(false, allFailures, deviceId)));
return;
}
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
final DeviceId did = operation.deviceId();
//final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
Set<FlowRuleBatchEntry> currentOps;
currentOps = operation.getOperations().stream().map(
op -> {
StoredFlowEntry entry;
switch (op.getOperator()) {
case ADD:
entry = new DefaultFlowEntry(op.getTarget());
// always add requested FlowRule

// Note: 2 equal FlowEntry may have different treatment
flowTable.remove(entry.deviceId(), entry);
flowTable.add(entry);
return op;
case REMOVE:
entry = flowTable.getFlowEntry(op.target());
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
return op;
}
break;
case MODIFY:
//TODO: figure this out at some point
break;
default:
log.warn("Unknown flow operation operator: {}", op.getOperator());
}
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
Collection<StoredFlowEntry> ft = flowEntries.get(deviceId);
// always add requested FlowRule
// Note: 2 equal FlowEntry may have different treatment
ft.remove(flowEntry);
ft.add(flowEntry);
toAdd.add(batchEntry);
return null;
}
}
if (toAdd.isEmpty() && toRemove.isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
}
// create remote backup copies
updateBackup(did, toAdd, toRemove);
} finally {
flowEntriesLock.writeLock().unlock();
).filter(op -> op != null).collect(Collectors.toSet());
if (currentOps.isEmpty()) {
batchOperationComplete(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(true, Collections.emptySet(), did)));
return;
}
updateBackup(did, currentOps);
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
pendingFutures.put(batchId, r);
notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
notifyDelegate(FlowRuleBatchEvent.requested(new
FlowRuleBatchRequest(operation.id(), currentOps), operation.deviceId()));
return r;
}
private void updateBackup(final DeviceId deviceId,
final List<FlowRuleBatchEntry> toAdd,
final List<FlowRuleBatchEntry> list) {
}
Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
if (syncBackup) {
// wait for backup to complete
try {
submit.get();
backup.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to create backups", e);
}
}
}
private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
}
@Override
public void deleteFlowRule(FlowRule rule) {
storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
storeBatch(
new FlowRuleBatchOperation(
Arrays.asList(
new FlowRuleBatchEntry(
FlowRuleOperation.REMOVE,
rule)), rule.deviceId(), idGenerator.getNewId()));
}
@Override
......@@ -484,37 +486,35 @@ public class DistributedFlowRuleStore
}
log.warn("Tried to update FlowRule {} state,"
+ " while the Node was not the master.", rule);
+ " while the Node was not the master.", rule);
return null;
}
private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
final DeviceId did = rule.deviceId();
flowEntriesLock.writeLock().lock();
try {
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = getFlowEntryInternal(rule);
if (stored != null) {
stored.setBytes(rule.bytes());
stored.setLife(rule.life());
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
updateBackup(did, Arrays.asList(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
// TODO: also update backup if the behavior is correct.
flowEntries.put(did, new DefaultFlowEntry(rule));
} finally {
flowEntriesLock.writeLock().unlock();
// check if this new rule is an update to an existing entry
StoredFlowEntry stored = flowTable.getFlowEntry(rule);
if (stored != null) {
stored.setBytes(rule.bytes());
stored.setLife(rule.life());
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
updateBackup(did, Sets.newHashSet(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
// TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
// TODO: also update backup if the behavior is correct.
flowTable.add(rule);
return null;
}
......@@ -540,9 +540,9 @@ public class DistributedFlowRuleStore
replicaInfo.master().orNull(), deviceId);
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
REMOVE_FLOW_ENTRY,
SERIALIZER.encode(rule));
clusterService.getLocalNode().id(),
REMOVE_FLOW_ENTRY,
SERIALIZER.encode(rule));
try {
Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
......@@ -555,38 +555,42 @@ public class DistributedFlowRuleStore
private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
final DeviceId deviceId = rule.deviceId();
flowEntriesLock.writeLock().lock();
try {
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowEntries.remove(deviceId, rule);
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
} finally {
flowEntriesLock.writeLock().unlock();
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
updateBackup(deviceId, Sets.newHashSet(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
final Integer batchId = event.subject().batchId();
SettableFuture<CompletedBatchOperation> future
= pendingFutures.getIfPresent(batchId);
if (future != null) {
future.set(event.result());
pendingFutures.invalidate(batchId);
}
notifyDelegate(event);
//FIXME: need a per device pending response
NodeId nodeId = pendingResponses.remove(event.subject().batchId());
if (nodeId == null) {
notifyDelegate(event);
} else {
try {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
REMOTE_APPLY_COMPLETED,
SERIALIZER.encode(event));
clusterCommunicator.sendAndReceive(message, nodeId);
} catch (IOException e) {
log.warn("Failed to respond to peer for batch operation result");
}
}
}
private void loadFromBackup(final DeviceId did) {
flowEntriesLock.writeLock().lock();
try {
log.debug("Loading FlowRules for {} from backups", did);
SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
......@@ -595,38 +599,21 @@ public class DistributedFlowRuleStore
log.trace("loading {}", e.getValue());
for (StoredFlowEntry entry : e.getValue()) {
flowEntries.remove(did, entry);
flowEntries.put(did, entry);
flowTable.getFlowEntriesById(entry).remove(entry);
flowTable.getFlowEntriesById(entry).add(entry);
}
}
} catch (ExecutionException e) {
log.error("Failed to load backup flowtable for {}", did, e);
} finally {
flowEntriesLock.writeLock().unlock();
}
}
private void removeFromPrimary(final DeviceId did) {
Collection<StoredFlowEntry> removed = null;
flowEntriesLock.writeLock().lock();
try {
removed = flowEntries.removeAll(did);
} finally {
flowEntriesLock.writeLock().unlock();
}
log.trace("removedFromPrimary {}", removed);
flowTable.clearDevice(did);
}
private static final class TimeoutFuture
implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
@Override
public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
// wrapping in ExecutionException to support Future.get
notification.getValue()
.setException(new ExecutionException("Timed out",
new TimeoutException()));
}
}
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
......@@ -640,7 +627,7 @@ public class DistributedFlowRuleStore
FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
log.debug("received batch request {}", operation);
final DeviceId deviceId = operation.getOperations().get(0).target().deviceId();
final DeviceId deviceId = operation.deviceId();
ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
if (!local.equals(replicaInfo.master().orNull())) {
......@@ -648,7 +635,7 @@ public class DistributedFlowRuleStore
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.target());
}
CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
// This node is no longer the master, respond as all failed.
// TODO: we might want to wrap response in envelope
// to distinguish sw programming failure and hand over
......@@ -661,36 +648,15 @@ public class DistributedFlowRuleStore
return;
}
final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
f.addListener(new Runnable() {
@Override
public void run() {
CompletedBatchOperation result;
try {
result = f.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Batch operation failed", e);
// create everything failed response
Set<FlowRule> failures = new HashSet<>(operation.size());
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.target());
}
result = new CompletedBatchOperation(false, failures);
}
try {
message.respond(SERIALIZER.encode(result));
} catch (IOException e) {
log.error("Failed to respond back", e);
}
}
}, futureListeners);
pendingResponses.put(operation.id(), message.sender());
storeBatchInternal(operation);
}
}
private final class SMapLoader
extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
@Override
public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
......@@ -701,7 +667,7 @@ public class DistributedFlowRuleStore
}
private final class InternalReplicaInfoEventListener
implements ReplicaInfoEventListener {
implements ReplicaInfoEventListener {
@Override
public void event(ReplicaInfoEvent event) {
......@@ -710,98 +676,166 @@ public class DistributedFlowRuleStore
final ReplicaInfo rInfo = event.replicaInfo();
switch (event.type()) {
case MASTER_CHANGED:
if (local.equals(rInfo.master().orNull())) {
// This node is the new master, populate local structure
// from backup
loadFromBackup(did);
} else {
// This node is no longer the master holder,
// clean local structure
removeFromPrimary(did);
// TODO: probably should stop pending backup activities in
// executors to avoid overwriting with old value
}
break;
default:
break;
case MASTER_CHANGED:
if (local.equals(rInfo.master().orNull())) {
// This node is the new master, populate local structure
// from backup
loadFromBackup(did);
} else {
// This node is no longer the master holder,
// clean local structure
removeFromPrimary(did);
// TODO: probably should stop pending backup activities in
// executors to avoid overwriting with old value
}
break;
default:
break;
}
}
}
// Task to update FlowEntries in backup HZ store
// TODO: Should be refactored to contain only one list and not
// toAdd and toRemove
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
private final List<FlowRuleBatchEntry> toAdd;
private final List<FlowRuleBatchEntry> toRemove;
private final Set<FlowRuleBatchEntry> ops;
public UpdateBackup(DeviceId deviceId,
List<FlowRuleBatchEntry> toAdd,
List<FlowRuleBatchEntry> list) {
Set<FlowRuleBatchEntry> ops) {
this.deviceId = checkNotNull(deviceId);
this.toAdd = checkNotNull(toAdd);
this.toRemove = checkNotNull(list);
this.ops = checkNotNull(ops);
}
@Override
public void run() {
try {
log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
log.trace("update backup {} {}", deviceId, ops
);
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
// Following should be rewritten using async APIs
for (FlowRuleBatchEntry bEntry : toAdd) {
final FlowRule entry = bEntry.target();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
if (original != null) {
list.addAll(original);
}
list.remove(bEntry.target());
list.add((StoredFlowEntry) entry);
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
if (original == null) {
success = (backupFlowTable.putIfAbsent(id, newValue) == null);
} else {
success = backupFlowTable.replace(id, original, newValue);
}
if (!success) {
log.error("Updating backup failed.");
}
}
for (FlowRuleBatchEntry bEntry : toRemove) {
final FlowRule entry = bEntry.target();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
if (original != null) {
list.addAll(original);
}
ops.stream().forEach(
op -> {
final FlowRule entry = op.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
if (original != null) {
list.addAll(original);
}
list.remove(op.getTarget());
if (op.getOperator() == FlowRuleOperation.ADD) {
list.add((StoredFlowEntry) entry);
}
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
if (original == null) {
success = (backupFlowTable.putIfAbsent(id, newValue) == null);
} else {
success = backupFlowTable.replace(id, original, newValue);
}
if (!success) {
log.error("Updating backup failed.");
}
list.remove(bEntry.target());
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
if (original == null) {
success = (backupFlowTable.putIfAbsent(id, newValue) == null);
} else {
success = backupFlowTable.replace(id, original, newValue);
}
if (!success) {
log.error("Updating backup failed.");
}
}
}
);
} catch (ExecutionException e) {
log.error("Failed to write to backups", e);
}
}
}
private class InternalFlowTable {
/*
TODO: This needs to be cleaned up. Perhaps using the eventually consistent
map when it supports distributed to a sequence of instances.
*/
private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
flowEntries = new ConcurrentHashMap<>();
private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
}
/**
* Returns the flow table for specified device.
*
* @param deviceId identifier of the device
* @return Map representing Flow Table of given device.
*/
private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
return createIfAbsentUnchecked(flowEntries,
deviceId, lazyEmptyFlowTable());
}
private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
Set<StoredFlowEntry> r = flowTable.get(flowId);
if (r == null) {
final Set<StoredFlowEntry> concurrentlyAdded;
r = new CopyOnWriteArraySet<>();
concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
if (concurrentlyAdded != null) {
return concurrentlyAdded;
}
}
return r;
}
private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
if (f.equals(rule)) {
return f;
}
}
return null;
}
private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
return getFlowTable(deviceId).values().stream()
.flatMap((list -> list.stream())).collect(Collectors.toSet());
}
public StoredFlowEntry getFlowEntry(FlowRule rule) {
return getFlowEntryInternal(rule);
}
public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
return getFlowEntriesInternal(deviceId);
}
public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
return getFlowEntriesInternal(entry.deviceId(), entry.id());
}
public void add(FlowEntry rule) {
((CopyOnWriteArraySet)
getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
}
public boolean remove(DeviceId deviceId, FlowEntry rule) {
return ((CopyOnWriteArraySet)
getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
//return flowEntries.remove(deviceId, rule);
}
public void clearDevice(DeviceId did) {
flowEntries.remove(did);
}
}
}
......
......@@ -34,4 +34,7 @@ public final class FlowStoreMessageSubjects {
public static final MessageSubject REMOVE_FLOW_ENTRY
= new MessageSubject("peer-forward-remove-flow-entry");
public static final MessageSubject REMOTE_APPLY_COMPLETED
= new MessageSubject("peer-apply-completed");
}
......
......@@ -59,15 +59,17 @@ import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchEvent;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchRequest;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flow.criteria.Criterion;
......@@ -162,6 +164,7 @@ public final class KryoNamespaces {
.register(Collections.emptySet().getClass())
.register(Optional.class)
.register(Collections.emptyList().getClass())
.register(Collections.unmodifiableSet(Collections.emptySet()).getClass())
.build();
/**
......@@ -255,6 +258,9 @@ public final class KryoNamespaces {
L3ModificationInstruction.L3SubType.class,
L3ModificationInstruction.ModIPInstruction.class,
RoleInfo.class,
FlowRuleBatchEvent.class,
FlowRuleBatchEvent.Type.class,
FlowRuleBatchRequest.class,
FlowRuleBatchOperation.class,
CompletedBatchOperation.class,
FlowRuleBatchEntry.class,
......
......@@ -21,13 +21,13 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.FluentIterable;
import com.google.common.util.concurrent.Futures;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.NewConcurrentHashMap;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
......@@ -46,7 +46,6 @@ import org.onosproject.net.flow.FlowRuleStore;
import org.onosproject.net.flow.FlowRuleStoreDelegate;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.store.AbstractStore;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
......@@ -56,7 +55,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -261,13 +259,14 @@ public class SimpleFlowRuleStore
}
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
public void storeBatch(
FlowRuleBatchOperation operation) {
List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
final FlowRule flowRule = entry.target();
if (entry.operator().equals(FlowRuleOperation.ADD)) {
for (FlowRuleBatchEntry entry : operation.getOperations()) {
final FlowRule flowRule = entry.getTarget();
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
storeFlowRule(flowRule);
toAdd.add(entry);
......@@ -283,21 +282,27 @@ public class SimpleFlowRuleStore
}
if (toAdd.isEmpty() && toRemove.isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(true, Collections.emptySet(),
operation.deviceId())));
return;
}
SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
final int batchId = localBatchIdGen.incrementAndGet();
pendingFutures.put(batchId, r);
notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
return r;
toAdd.addAll(toRemove);
notifyDelegate(FlowRuleBatchEvent.requested(
new FlowRuleBatchRequest(batchId, Sets.newHashSet(toAdd)), operation.deviceId()));
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
final Integer batchId = event.subject().batchId();
final Long batchId = event.subject().batchId();
SettableFuture<CompletedBatchOperation> future
= pendingFutures.getIfPresent(batchId);
if (future != null) {
......
......@@ -116,7 +116,7 @@ public class NullDeviceProvider extends AbstractProvider implements DeviceProvid
@Activate
public void activate(ComponentContext context) {
providerService = providerRegistry.register(this);
if (modified(context)) {
if (!modified(context)) {
deviceBuilder.submit(new DeviceCreator(true));
}
log.info("Started");
......@@ -173,6 +173,9 @@ public class NullDeviceProvider extends AbstractProvider implements DeviceProvid
chgd |= true;
}
log.info("Using settings numDevices={}, numPorts={}", numDevices, numPorts);
if (chgd) {
deviceBuilder.submit(new DeviceCreator(true));
}
return chgd;
}
......
......@@ -15,9 +15,7 @@
*/
package org.onosproject.provider.nil.flow.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -29,12 +27,12 @@ import org.jboss.netty.util.TimerTask;
import org.onlab.util.Timer;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
......@@ -43,7 +41,9 @@ import org.onosproject.net.provider.ProviderId;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.concurrent.Future;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -59,7 +59,7 @@ public class NullFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleProviderRegistry providerRegistry;
private Multimap<DeviceId, FlowEntry> flowTable = HashMultimap.create();
private ConcurrentMap<DeviceId, Set<FlowEntry>> flowTable = new ConcurrentHashMap<>();
private FlowRuleProviderService providerService;
......@@ -88,18 +88,10 @@ public class NullFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
flowTable.put(flowRules[i].deviceId(), new DefaultFlowEntry(flowRules[i]));
}
}
public void applyFlowRule(FlowRule... flowRules) {}
@Override
public void removeFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
flowTable.remove(flowRules[i].deviceId(), flowRules[i]);
}
}
public void removeFlowRule(FlowRule... flowRules) {}
@Override
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
......@@ -107,26 +99,32 @@ public class NullFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public Future<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
public void executeBatch(
FlowRuleBatchOperation batch) {
Set<FlowEntry> flowRules = flowTable.getOrDefault(batch.deviceId(), Sets.newConcurrentHashSet());
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
switch (fbe.operator()) {
case ADD:
applyFlowRule(fbe.target());
flowRules.add(new DefaultFlowEntry(fbe.target()));
break;
case REMOVE:
removeFlowRule(fbe.target());
flowRules.remove(new DefaultFlowEntry(fbe.target()));
break;
case MODIFY:
removeFlowRule(fbe.target());
applyFlowRule(fbe.target());
FlowEntry entry = new DefaultFlowEntry(fbe.target());
flowRules.remove(entry);
flowRules.add(entry);
break;
default:
log.error("Unknown flow operation: {}", fbe);
}
}
return Futures.immediateFuture(
new CompletedBatchOperation(true, Collections.emptySet()));
flowTable.put(batch.deviceId(), flowRules);
providerService.batchOperationCompleted(batch.id(),
new CompletedBatchOperation(
true,
Collections.emptySet(),
batch.deviceId()));
}
private class StatisticTask implements TimerTask {
......@@ -134,10 +132,11 @@ public class NullFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public void run(Timeout to) throws Exception {
for (DeviceId devId : flowTable.keySet()) {
providerService.pushFlowMetrics(devId, flowTable.get(devId));
providerService.pushFlowMetrics(devId,
flowTable.getOrDefault(devId, Collections.emptySet()));
}
timeout = timer.newTimeout(to.getTask(), 5, TimeUnit.SECONDS);
}
}
}
......
......@@ -35,6 +35,7 @@ import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.openflow.controller.Dpid;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFInstructionType;
......@@ -74,13 +75,16 @@ public class FlowEntryBuilder {
private final OFFlowStatsEntry stat;
private final OFFlowRemoved removed;
private final OFFlowMod flowMod;
private final Match match;
private final List<OFAction> actions;
private final Dpid dpid;
private final boolean addedRule;
public enum FlowType { STAT, REMOVED, MOD }
private final FlowType type;
public FlowEntryBuilder(Dpid dpid, OFFlowStatsEntry entry) {
......@@ -89,7 +93,8 @@ public class FlowEntryBuilder {
this.actions = getActions(entry);
this.dpid = dpid;
this.removed = null;
this.addedRule = true;
this.flowMod = null;
this.type = FlowType.STAT;
}
public FlowEntryBuilder(Dpid dpid, OFFlowRemoved removed) {
......@@ -99,26 +104,48 @@ public class FlowEntryBuilder {
this.dpid = dpid;
this.actions = null;
this.stat = null;
this.addedRule = false;
this.flowMod = null;
this.type = FlowType.REMOVED;
}
public FlowEntry build() {
if (addedRule) {
FlowRule rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
buildSelector(), buildTreatment(), stat.getPriority(),
stat.getCookie().getValue(), stat.getIdleTimeout(), false);
return new DefaultFlowEntry(rule, FlowEntryState.ADDED,
stat.getDurationSec(), stat.getPacketCount().getValue(),
stat.getByteCount().getValue());
public FlowEntryBuilder(Dpid dpid, OFFlowMod fm) {
this.match = fm.getMatch();
this.dpid = dpid;
this.actions = fm.getActions();
this.type = FlowType.MOD;
this.flowMod = fm;
this.stat = null;
this.removed = null;
}
} else {
FlowRule rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
buildSelector(), null, removed.getPriority(),
removed.getCookie().getValue(), removed.getIdleTimeout(), false);
return new DefaultFlowEntry(rule, FlowEntryState.REMOVED, removed.getDurationSec(),
removed.getPacketCount().getValue(), removed.getByteCount().getValue());
public FlowEntry build(FlowEntryState... state) {
FlowRule rule;
switch (this.type) {
case STAT:
rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
buildSelector(), buildTreatment(), stat.getPriority(),
stat.getCookie().getValue(), stat.getIdleTimeout(), false);
return new DefaultFlowEntry(rule, FlowEntryState.ADDED,
stat.getDurationSec(), stat.getPacketCount().getValue(),
stat.getByteCount().getValue());
case REMOVED:
rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
buildSelector(), null, removed.getPriority(),
removed.getCookie().getValue(), removed.getIdleTimeout(), false);
return new DefaultFlowEntry(rule, FlowEntryState.REMOVED, removed.getDurationSec(),
removed.getPacketCount().getValue(), removed.getByteCount().getValue());
case MOD:
FlowEntryState flowState = state.length > 0 ? state[0] : FlowEntryState.FAILED;
rule = new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
buildSelector(), buildTreatment(), flowMod.getPriority(),
flowMod.getCookie().getValue(), flowMod.getIdleTimeout(), false);
return new DefaultFlowEntry(rule, flowState, 0, 0, 0);
default:
log.error("Unknown flow type : {}", this.type);
return null;
}
}
private List<OFAction> getActions(OFFlowStatsEntry entry) {
......
......@@ -15,25 +15,13 @@
*/
package org.onosproject.provider.of.flow.impl;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -41,19 +29,16 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.flow.BatchOperation;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleBatchEntry;
import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleProvider;
import org.onosproject.net.flow.FlowRuleProviderRegistry;
import org.onosproject.net.flow.FlowRuleProviderService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
......@@ -63,6 +48,7 @@ import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFErrorType;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
......@@ -75,21 +61,22 @@ import org.projectfloodlight.openflow.protocol.OFStatsType;
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider which uses an OpenFlow controller to detect network
......@@ -98,8 +85,6 @@ import com.google.common.collect.Sets;
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
enum BatchState { STARTED, FINISHED, CANCELLED }
private static final int LOWEST_PRIORITY = 0;
private final Logger log = getLogger(getClass());
......@@ -110,22 +95,15 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenFlowController controller;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
private FlowRuleProviderService providerService;
private final InternalFlowProvider listener = new InternalFlowProvider();
// FIXME: This should be an expiring map to ensure futures that don't have
// a future eventually get garbage collected.
private final Map<Long, InstallationFuture> pendingFutures = new ConcurrentHashMap<>();
private final Map<Long, InstallationFuture> pendingFMs = new ConcurrentHashMap<>();
private Cache<Long, InternalCacheEntry> pendingBatches;
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
private final AtomicLong xidCounter = new AtomicLong(1);
/**
* Creates an OpenFlow host provider.
......@@ -140,6 +118,16 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
controller.addListener(listener);
controller.addEventListener(listener);
pendingBatches = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
providerService.batchOperationCompleted(notification.getKey(),
notification.getValue().failedCompletion());
}
}).build();
for (OpenFlowSwitch sw : controller.getSwitches()) {
FlowStatsCollector fsc = new FlowStatsCollector(sw, POLL_INTERVAL);
fsc.start();
......@@ -160,8 +148,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
applyRule(flowRules[i]);
for (FlowRule flowRule : flowRules) {
applyRule(flowRule);
}
}
......@@ -179,8 +167,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public void removeFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
removeRule(flowRules[i]);
for (FlowRule flowRule : flowRules) {
removeRule(flowRule);
}
}
......@@ -203,36 +191,20 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = Sets.newConcurrentHashSet();
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
/*
* Use identity hash map for reference equality as we could have equal
* flow mods for different switches.
*/
Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
Map<OFFlowMod, OpenFlowSwitch.TableType> modTypes = Maps.newIdentityHashMap();
public void executeBatch(FlowRuleBatchOperation batch) {
pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId().uri()));
OFFlowMod mod;
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.target();
final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
OpenFlowSwitch sw = controller.getSwitch(dpid);
if (sw == null) {
/*
* if a switch we are supposed to install to is gone then
* cancel (ie. rollback) the work that has been done so far
* and return the associated future.
*/
InstallationFuture failed = new InstallationFuture(sws, fmXids);
failed.cancel(true);
return failed;
}
sws.add(dpid);
final Long flowModXid = xidCounter.getAndIncrement();
FlowModBuilder builder =
FlowModBuilder.builder(flowRule, sw.factory(),
Optional.of(flowModXid));
OFFlowMod mod = null;
FlowModBuilder.builder(fbe.target(), sw.factory(),
Optional.of(batch.id()));
switch (fbe.operator()) {
case ADD:
mod = builder.buildFlowAdd();
......@@ -244,34 +216,16 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
mod = builder.buildFlowMod();
break;
default:
log.error("Unsupported batch operation {}", fbe.operator());
}
if (mod != null) {
mods.put(mod, sw);
modTypes.put(mod, getTableType(flowRule.type()));
fmXids.put(flowModXid, fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
}
InstallationFuture installation = new InstallationFuture(sws, fmXids);
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
pendingFutures.put(installation.xid(), installation);
for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
OpenFlowSwitch sw = entry.getValue();
OFFlowMod mod = entry.getKey();
OpenFlowSwitch.TableType tableType = modTypes.get(mod);
if (tableType == OpenFlowSwitch.TableType.NONE) {
sw.sendMsg(mod);
} else {
sw.sendMsg(mod, tableType);
}
log.error("Unsupported batch operation {}; skipping flowmod {}",
fbe.operator(), fbe);
continue;
}
sw.sendMsg(mod);
}
installation.verify();
return installation;
OFBarrierRequest.Builder builder = sw.factory()
.buildBarrierRequest()
.setXid(batch.id());
sw.sendMsg(builder.build());
}
private OpenFlowSwitch.TableType getTableType(FlowRule.Type type) {
......@@ -287,12 +241,11 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
}
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
private final Multimap<DeviceId, FlowEntry> completeEntries =
ArrayListMultimap.create();
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
@Override
public void switchAdded(Dpid dpid) {
......@@ -320,7 +273,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
OFFlowRemoved removed = (OFFlowRemoved) msg;
......@@ -334,22 +286,42 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
break;
case BARRIER_REPLY:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.satisfyRequirement(dpid);
} else {
log.warn("Received unknown Barrier Reply: {}", msg.getXid());
try {
InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
if (entry != null) {
providerService.batchOperationCompleted(msg.getXid(), entry.completed());
} else {
log.warn("Received unknown Barrier Reply: {}", msg.getXid());
}
} finally {
pendingBatches.invalidate(msg.getXid());
}
break;
case ERROR:
log.warn("received Error message {} from {}", msg, dpid);
future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
OFErrorMsg error = (OFErrorMsg) msg;
if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
if (fmFailed.getData().getParsedMessage().isPresent()) {
OFMessage m = fmFailed.getData().getParsedMessage().get();
OFFlowMod fm = (OFFlowMod) m;
InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
if (entry != null) {
entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
} else {
log.error("No matching batch for this error: {}", error);
}
} else {
//FIXME: Potentially add flowtracking to avoid this message.
log.error("Flow installation failed but switch didn't" +
" tell us which one.");
}
} else {
log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
log.warn("Received error {}", error);
}
break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
......@@ -402,198 +374,50 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
private class InstallationFuture implements Future<CompletedBatchOperation> {
// barrier xid
private final Long xid;
// waiting for barrier reply from...
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
// FlowMod xid ->
private final Map<Long, FlowRuleBatchEntry> fms;
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
// Failed batch operation id
private Long failedId;
private final CountDownLatch countDownLatch;
private BatchState state;
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.xid = xidCounter.getAndIncrement();
this.state = BatchState.STARTED;
this.sws = sws;
this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
public Long xid() {
return xid;
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
FlowEntry fe = null;
FlowRuleBatchEntry fbe = fms.get(msg.getXid());
failedId = fbe.id();
FlowRule offending = fbe.target();
//TODO handle specific error msgs
switch (msg.getErrType()) {
case BAD_ACTION:
OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
badins.getCode().ordinal());
break;
case BAD_MATCH:
OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
fmFail.getCode().ordinal());
break;
case EXPERIMENTER:
case GROUP_MOD_FAILED:
case HELLO_FAILED:
case METER_MOD_FAILED:
case PORT_MOD_FAILED:
case QUEUE_OP_FAILED:
case ROLE_REQUEST_FAILED:
case SWITCH_CONFIG_FAILED:
case TABLE_FEATURES_FAILED:
case TABLE_MOD_FAILED:
fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
break;
default:
log.error("Unknown error type {}", msg.getErrType());
}
offendingFlowMods.add(fe);
removeRequirement(dpid);
}
public void satisfyRequirement(Dpid dpid) {
log.debug("Satisfaction from switch {}", dpid);
removeRequirement(dpid);
}
public void verify() {
checkState(!sws.isEmpty());
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
.buildBarrierRequest()
.setXid(xid);
sw.sendMsg(builder.build());
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
}
ok.set(false);
this.state = BatchState.CANCELLED;
cleanUp();
for (FlowRuleBatchEntry fbe : fms.values()) {
if (fbe.operator() == FlowRuleOperation.ADD ||
fbe.operator() == FlowRuleOperation.MODIFY) {
removeFlowRule(fbe.target());
} else if (fbe.operator() == FlowRuleOperation.REMOVE) {
applyRule(fbe.target());
}
}
return true;
}
@Override
public boolean isCancelled() {
return this.state == BatchState.CANCELLED;
}
/**
* The internal cache entry holding the original request as well
* as accumulating the any failures along the way.
*
* If this entry is evicted from the cache then the entire operation
* is considered failed. Otherwise, only the failures reported by the device
* will be propagated up.
*/
private class InternalCacheEntry {
@Override
public boolean isDone() {
return this.state == BatchState.FINISHED || isCancelled();
}
private final FlowRuleBatchOperation operation;
private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
@Override
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
this.state = BatchState.FINISHED;
Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
CompletedBatchOperation result =
new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
//FIXME do cleanup here (moved by BOC)
cleanUp();
return result;
public InternalCacheEntry(FlowRuleBatchOperation operation) {
this.operation = operation;
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
CompletedBatchOperation result =
new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
// FIXME do cleanup here (moved by BOC)
cleanUp();
return result;
}
throw new TimeoutException(this.toString());
/**
* Appends a failed rule to the set of failed items.
* @param rule the failed rule
*/
public void appendFailure(FlowRule rule) {
failures.add(rule);
}
private void cleanUp() {
if (isDone() || isCancelled()) {
pendingFutures.remove(xid);
for (Long xid : fms.keySet()) {
pendingFMs.remove(xid);
}
}
/**
* Fails the entire batch and returns the failed operation.
* @return the failed operation
*/
public CompletedBatchOperation failedCompletion() {
Set<FlowRule> fails = operation.getOperations().stream()
.map(op -> op.target()).collect(Collectors.toSet());
return new CompletedBatchOperation(false, Collections.unmodifiableSet(fails), operation.deviceId());
}
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
sws.remove(dpid);
//FIXME don't do cleanup here (moved by BOC)
//cleanUp();
/**
* Returns the completed operation and whether the batch suceeded.
* @return the completed operation
*/
public CompletedBatchOperation completed() {
return new CompletedBatchOperation(failures.isEmpty(),
Collections.unmodifiableSet(failures), operation.deviceId());
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("xid", xid)
.add("pending devices", sws)
.add("devices in batch",
fms.values().stream()
.map((fbe) -> fbe.target().deviceId())
.distinct().collect(Collectors.toList()))
.add("failedId", failedId)
.add("latchCount", countDownLatch.getCount())
.add("state", state)
.add("no error?", ok.get())
.toString();
}
}
}
......
......@@ -2,7 +2,7 @@
# Instance-specific configurations, in this case, the number of
# devices per node.
#
devConfigs = 192.168.56.30:5,192.168.56.40:7
devConfigs = 192.168.97.132:5,192.168.97.131:5
#
# Number of ports per device. This is global to all devices
......
......@@ -9,4 +9,4 @@
#
# Set order of islands to chain together, in a line.
#
neighbors = 192.168.56.20,192.168.56.30,192.168.56.40
neighbors = 192.168.97.132,192.168.97.131
......