Thomas Vachuska

ONOS-4604 Fixed flow objective installation

Removed context from objective toString methods.
Removed duplicate flow objective delegate notifications in the store for next objectives.
Synchronized queueing of forwarding objectives for pending next objectives to avoid notifications race.
Changed logging for better readability.

Change-Id: Ic2bd411a891ea035a2c5513b24dea5fbd48f187d
......@@ -160,7 +160,6 @@ public final class DefaultFilteringObjective implements FilteringObjective {
.add("appId", appId())
.add("permanent", permanent())
.add("timeout", timeout())
.add("context", context())
.toString();
}
......
......@@ -169,7 +169,6 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
.add("appId", appId())
.add("permanent", permanent())
.add("timeout", timeout())
.add("context", context())
.toString();
}
......
......@@ -138,7 +138,6 @@ public final class DefaultNextObjective implements NextObjective {
.add("appId", appId())
.add("permanent", permanent())
.add("timeout", timeout())
.add("context", context())
.toString();
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.net.flowobjective.impl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -55,21 +56,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.*;
import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
/**
* Provides implementation of the flow objective programming service.
......@@ -124,7 +121,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
private final Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
// local store to track which nextObjectives were sent to which device
// for debugging purposes
......@@ -237,21 +234,33 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public void initPolicy(String policy) {}
private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
if (fwd.nextId() != null &&
flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
log.debug("Queuing forwarding objective {} for nextId {} meant for device {}",
fwd.id(), fwd.nextId(), deviceId);
// TODO: change to computeIfAbsent
Set<PendingNext> newset = Collections.newSetFromMap(
new ConcurrentHashMap<PendingNext, Boolean>());
newset.add(new PendingNext(deviceId, fwd));
Set<PendingNext> pnext = pendingForwards.putIfAbsent(fwd.nextId(), newset);
if (pnext != null) {
pnext.add(new PendingNext(deviceId, fwd));
if (fwd.nextId() == null ||
flowObjectiveStore.getNextGroup(fwd.nextId()) != null) {
// fast path
return false;
}
return true;
boolean queued = false;
synchronized (pendingForwards) {
// double check the flow objective store, because this block could run
// after a notification arrives
if (flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
pendingForwards.compute(fwd.nextId(), (id, pending) -> {
PendingNext next = new PendingNext(deviceId, fwd);
if (pending == null) {
return Sets.newHashSet(next);
} else {
pending.add(next);
return pending;
}
return false;
});
queued = true;
}
}
if (queued) {
log.debug("Queued forwarding objective {} for nextId {} meant for device {}",
fwd.id(), fwd.nextId(), deviceId);
}
return queued;
}
// Retrieves the device pipeline behaviour from the cache.
......@@ -396,7 +405,11 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public void notify(ObjectiveEvent event) {
if (event.type() == Type.ADD) {
log.debug("Received notification of obj event {}", event);
Set<PendingNext> pending = pendingForwards.remove(event.subject());
Set<PendingNext> pending;
synchronized (pendingForwards) {
// needs to be synchronized for queueObjective lookup
pending = pendingForwards.remove(event.subject());
}
if (pending == null) {
log.debug("Nothing pending for this obj event {}", event);
......
......@@ -120,7 +120,7 @@ class IntentInstaller {
// if toInstall was cause of error, then recompile (manage/increment counter, when exceeded -> CORRUPT)
if (toInstall.isPresent()) {
IntentData installData = toInstall.get();
log.warn("Failed installation: {} {} on {}",
log.warn("Failed installation: {} {} due to {}",
installData.key(), installData.intent(), ctx.error());
installData.setState(CORRUPT);
installData.incrementErrorCount();
......@@ -129,7 +129,7 @@ class IntentInstaller {
// if toUninstall was cause of error, then CORRUPT (another job will clean this up)
if (toUninstall.isPresent()) {
IntentData uninstallData = toUninstall.get();
log.warn("Failed withdrawal: {} {} on {}",
log.warn("Failed withdrawal: {} {} due to {}",
uninstallData.key(), uninstallData.intent(), ctx.error());
uninstallData.setState(CORRUPT);
uninstallData.incrementErrorCount();
......@@ -355,6 +355,7 @@ class IntentInstaller {
private class FlowObjectiveInstallationContext implements ObjectiveContext {
Objective objective;
DeviceId deviceId;
ObjectiveError error;
void setObjective(Objective objective, DeviceId deviceId) {
this.objective = objective;
......@@ -368,6 +369,7 @@ class IntentInstaller {
@Override
public void onError(Objective objective, ObjectiveError error) {
this.error = error;
errorContexts.add(this);
finish();
}
......@@ -387,7 +389,7 @@ class IntentInstaller {
@Override
public String toString() {
return String.format("(%s, %s)", deviceId, objective);
return String.format("(%s on %s for %s)", error, deviceId, objective);
}
}
}
......
......@@ -97,7 +97,6 @@ public class DistributedFlowObjectiveStore
@Override
public void putNextGroup(Integer nextId, NextGroup group) {
nextGroups.put(nextId, group.data());
notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.ADD, nextId));
}
@Override
......@@ -113,7 +112,6 @@ public class DistributedFlowObjectiveStore
public NextGroup removeNextGroup(Integer nextId) {
Versioned<byte[]> versionGroup = nextGroups.remove(nextId);
if (versionGroup != null) {
notifyDelegate(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, nextId));
return new DefaultNextGroup(versionGroup.value());
}
return null;
......