Madan Jampani
Committed by Gerrit Code Review

stc work queue test improvements

Change-Id: I8b9335b0bbfdc8a447c5955bf3621962ff112cb2
......@@ -46,7 +46,7 @@ public class WorkQueueTestCommand extends AbstractShellCommand {
@Argument(index = 1, name = "operation",
description = "operation name. One of {add, addMutiple, "
+ "takeAndComplete, totalPending, totalInProgress, totalCompleted}",
+ "takeAndComplete, totalPending, totalInProgress, totalCompleted, destroy}",
required = true, multiValued = false)
String operation = null;
......@@ -95,9 +95,11 @@ public class WorkQueueTestCommand extends AbstractShellCommand {
} else if (operation.equals("totalCompleted")) {
WorkQueueStats stats = get(queue.stats());
print("%d", stats.totalCompleted());
} else if (operation.equals("destroy")) {
get(queue.destroy());
} else {
print("Invalid operation name. Valid operations names are:"
+ " [add, addMultiple takeAndComplete, totalPending, totalInProgress, totalCompleted]");
+ " [add, addMultiple takeAndComplete, totalPending, totalInProgress, totalCompleted, destroy]");
}
}
......
......@@ -29,10 +29,10 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.onosproject.utils.MeteringAgent;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.onosproject.utils.MeteringAgent;
/**
* Default implementation of a {@code AsyncAtomicValue}.
......@@ -56,6 +56,7 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
private static final String ADD_LISTENER = "addListener";
private static final String REMOVE_LISTENER = "removeListener";
private static final String NOTIFY_LISTENER = "notifyListener";
private static final String DESTROY = "destroy";
public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
this.name = checkNotNull(name, "name must not be null");
......@@ -70,6 +71,14 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
}
@Override
public CompletableFuture<Void> destroy() {
final MeteringAgent.Context newTimer = monitor.startTimer(DESTROY);
return backingMap.remove(name)
.whenComplete((r, e) -> newTimer.stop(e))
.thenApply(v -> null);
}
@Override
public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
......
......@@ -7,9 +7,9 @@ import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.collect.Collections2;
......@@ -73,4 +73,9 @@ public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
public CompletableFuture<Void> stopProcessing() {
return backingQueue.stopProcessing();
}
@Override
public CompletableFuture<Void> destroy() {
return backingQueue.destroy();
}
}
......
......@@ -98,14 +98,18 @@ public class AtomixWorkQueueState extends ResourceStateMachine implements Sessi
}
protected void clear(Commit<? extends Clear> commit) {
unassignedTasks.forEach(TaskHolder::complete);
unassignedTasks.clear();
assignments.values().forEach(TaskAssignment::markComplete);
assignments.clear();
registeredWorkers.values().forEach(Commit::close);
registeredWorkers.clear();
activeTasksPerSession.clear();
totalCompleted.set(0);
try {
unassignedTasks.forEach(TaskHolder::complete);
unassignedTasks.clear();
assignments.values().forEach(TaskAssignment::markComplete);
assignments.clear();
registeredWorkers.values().forEach(Commit::close);
registeredWorkers.clear();
activeTasksPerSession.clear();
totalCompleted.set(0);
} finally {
commit.close();
}
}
protected void register(Commit<? extends Register> commit) {
......
......@@ -30,6 +30,9 @@
<import file="${ONOS_SCENARIOS}/dist-leader.xml"/>
<dependency name="Distributed-Primitive-Leader" requires="Distributed-Primitive-Counter"/>
<import file="${ONOS_SCENARIOS}/dist-work-queue.xml"/>
<dependency name="Distributed-Primitive-WorkQueue" requires="Distributed-Primitive-Leader"/>
</group>
</scenario>
......
......@@ -30,6 +30,9 @@
<step name="Distributed-Primitive-WorkQueue.Test-Queue-Check-InProgress-1" requires="^"
exec="onos-cluster-execute-expect work-queue-test stc-test-work-queue totalInProgress --expect 0"/>
<step name="Distributed-Primitive-WorkQueue.Test-Queue-Check-TotalCompleted-1" requires="^"
exec="onos-cluster-execute-expect work-queue-test stc-test-work-queue totalCompleted --expect 0"/>
<step name="Distributed-Primitive-WorkQueue.Test-Queue-AddMultiple" requires="^"
exec="onos-execute-expect ${OCI} work-queue-test stc-test-work-queue addMultiple bar car --expect Done"/>
......@@ -42,7 +45,11 @@
<step name="Distributed-Primitive-WorkQueue.Test-Queue-Check-InProgress-2" requires="^"
exec="onos-cluster-execute-expect work-queue-test stc-test-work-queue totalInProgress --expect 0"/>
<!-- Since totalCompleted is a additive quantity, testing its value breaks when the test is run in a loop -->
<step name="Distributed-Primitive-WorkQueue.Test-Queue-Check-TotalCompleted-2" requires="^"
exec="onos-cluster-execute-expect work-queue-test stc-test-work-queue totalCompleted --expect 3"/>
<step name="Distributed-Primitive-WorkQueue.Test-Queue-Destroy" requires="^"
exec="onos ${OCI} work-queue-test stc-test-work-queue destroy"/>
<!--Check with check logs-->
<step name="Distributed-Primitive-WorkQueue.Check-Log-Exceptions" requires="^"
......