Thomas Vachuska
Committed by Ray Milkey

Fixing various issues and re-tuning.

Change-Id: I8822fcf77cfa507788241c5bda98ef4741b284b4
......@@ -23,13 +23,15 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.mastership.MastershipAdminService;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.device.DeviceService;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.onos.net.MastershipRole.MASTER;
/**
* Forces device mastership rebalancing.
......@@ -50,73 +52,62 @@ public class BalanceMastersCommand extends AbstractShellCommand {
// Create buckets reflecting current ownership.
for (ControllerNode node : nodes) {
controllerDevices.putAll(node, mastershipService.getDevicesOf(node.id()));
Set<DeviceId> devicesOf = mastershipService.getDevicesOf(node.id());
controllerDevices.putAll(node, devicesOf);
print("Node %s has %d devices.", node.id(), devicesOf.size());
}
int bucketCount = nodes.size();
for (int i = 0; i < bucketCount / 2; i++) {
int rounds = nodes.size();
for (int i = 0; i < rounds; i++) {
// Iterate over the buckets and find the smallest and the largest.
ControllerNode smallest = findSmallestBucket(controllerDevices);
ControllerNode largest = findLargestBucket(controllerDevices);
balanceBuckets(smallest, largest, controllerDevices,
mastershipService, adminService);
ControllerNode smallest = findBucket(true, nodes, controllerDevices);
ControllerNode largest = findBucket(false, nodes, controllerDevices);
balanceBuckets(smallest, largest, controllerDevices, adminService);
}
}
private ControllerNode findSmallestBucket(Multimap<ControllerNode, DeviceId> controllerDevices) {
int minSize = Integer.MAX_VALUE;
ControllerNode minNode = null;
for (ControllerNode node : controllerDevices.keySet()) {
int size = controllerDevices.get(node).size();
if (size < minSize) {
minSize = size;
minNode = node;
}
}
return minNode;
}
private ControllerNode findLargestBucket(Multimap<ControllerNode, DeviceId> controllerDevices) {
int maxSize = -1;
ControllerNode maxNode = null;
for (ControllerNode node : controllerDevices.keySet()) {
private ControllerNode findBucket(boolean min, Collection<ControllerNode> nodes,
Multimap<ControllerNode, DeviceId> controllerDevices) {
int xSize = min ? Integer.MAX_VALUE : -1;
ControllerNode xNode = null;
for (ControllerNode node : nodes) {
int size = controllerDevices.get(node).size();
if (size >= maxSize) {
maxSize = size;
maxNode = node;
if ((min && size < xSize) || (!min && size > xSize)) {
xSize = size;
xNode = node;
}
}
return maxNode;
return xNode;
}
// FIXME: enhance to better handle cases where smallest cannot take any of the devices from largest
private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
Multimap<ControllerNode, DeviceId> controllerDevices,
MastershipService mastershipService,
MastershipAdminService adminService) {
Collection<DeviceId> minBucket = controllerDevices.get(smallest);
Collection<DeviceId> maxBucket = controllerDevices.get(largest);
int bucketCount = controllerDevices.keySet().size();
int deviceCount = get(DeviceService.class).getDeviceCount();
int delta = (maxBucket.size() - minBucket.size()) / 2;
print("Attempting to move %d nodes from %s to %s...",
delta, largest.id(), smallest.id());
int i = 0;
Iterator<DeviceId> it = maxBucket.iterator();
while (it.hasNext() && i < delta) {
DeviceId deviceId = it.next();
// Check that the transfer can happen for the current element.
if (mastershipService.getNodesFor(deviceId).backups().contains(smallest.id())) {
print("Setting %s as the new master for %s", smallest.id(), deviceId);
adminService.setRole(smallest.id(), deviceId, MastershipRole.MASTER);
delta = Math.min(deviceCount / bucketCount, delta);
if (delta > 0) {
print("Attempting to move %d nodes from %s to %s...",
delta, largest.id(), smallest.id());
int i = 0;
Iterator<DeviceId> it = maxBucket.iterator();
while (it.hasNext() && i < delta) {
DeviceId deviceId = it.next();
print("Setting %s as the master for %s", smallest.id(), deviceId);
adminService.setRole(smallest.id(), deviceId, MASTER);
controllerDevices.put(smallest, deviceId);
it.remove();
i++;
}
}
controllerDevices.removeAll(smallest);
}
}
......
......@@ -16,6 +16,8 @@
package org.onlab.onos.event;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Timer;
......@@ -31,6 +33,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public abstract class AbstractEventAccumulator implements EventAccumulator {
private Logger log = LoggerFactory.getLogger(AbstractEventAccumulator.class);
private final Timer timer;
private final int maxEvents;
private final int maxBatchMillis;
......@@ -104,9 +108,13 @@ public abstract class AbstractEventAccumulator implements EventAccumulator {
private class ProcessorTask extends TimerTask {
@Override
public void run() {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processEvents(finalizeCurrentBatch());
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processEvents(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e.getMessage());
}
}
}
......
......@@ -66,13 +66,13 @@ public class DefaultTopologyProvider extends AbstractProvider
implements TopologyProvider {
private static final int MAX_THREADS = 8;
private static final int DEFAULT_MAX_EVENTS = 100;
private static final int DEFAULT_MAX_BATCH_MS = 50;
private static final int DEFAULT_MAX_IDLE_MS = 5;
private static final int DEFAULT_MAX_EVENTS = 200;
private static final int DEFAULT_MAX_BATCH_MS = 60;
private static final int DEFAULT_MAX_IDLE_MS = 30;
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer();
private static final Timer TIMER = new Timer("topo-event-batching");
@Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
label = "Maximum number of events to accumulate")
......@@ -122,6 +122,9 @@ public class DefaultTopologyProvider extends AbstractProvider
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
log.info("Configured with maxEvents = {}; maxBatchMs = {}; maxIdleMs = {}",
maxEvents, maxBatchMs, maxIdleMs);
isStarted = true;
triggerRecompute();
log.info("Started");
......
#!/bin/bash
# -----------------------------------------------------------------------------
# ONOS topology configuration uploader.
# -----------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
for node in $nodes; do
printf "$node..."
onos-topo-cfg $node $1
done
printf "\n"
......@@ -24,6 +24,9 @@ import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.event.AbstractEventAccumulator;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.EventAccumulator;
import org.onlab.onos.mastership.MastershipEvent;
import org.onlab.onos.mastership.MastershipListener;
import org.onlab.onos.net.ConnectPoint;
......@@ -36,6 +39,8 @@ import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostEvent;
......@@ -84,8 +89,7 @@ public class TopologyViewWebSocket
private static final String APP_ID = "org.onlab.onos.gui";
private static final long SUMMARY_FREQUENCY_SEC = 3000;
private static final long TRAFFIC_FREQUENCY_SEC = 1500;
private static final long TRAFFIC_FREQUENCY_SEC = 2000;
private static final Comparator<? super ControllerNode> NODE_COMPARATOR =
new Comparator<ControllerNode>() {
......@@ -95,6 +99,13 @@ public class TopologyViewWebSocket
}
};
private final Timer timer = new Timer("topology-view");
private static final int MAX_EVENTS = 500;
private static final int MAX_BATCH_MS = 1000;
private static final int MAX_IDLE_MS = 500;
private final ApplicationId appId;
private Connection connection;
......@@ -106,16 +117,14 @@ public class TopologyViewWebSocket
private final LinkListener linkListener = new InternalLinkListener();
private final HostListener hostListener = new InternalHostListener();
private final IntentListener intentListener = new InternalIntentListener();
private final FlowRuleListener flowListener = new InternalFlowListener();
// Timers and objects being monitored
private final Timer timer = new Timer("topology-view");
private final EventAccumulator eventAccummulator = new InternalEventAccummulator();
private boolean summaryEnabled = true;
private TimerTask trafficTask;
private ObjectNode trafficEvent;
private TimerTask summaryTask;
private ObjectNode summaryEvent;
private long lastActive = System.currentTimeMillis();
private boolean listenersRemoved = false;
......@@ -128,7 +137,6 @@ public class TopologyViewWebSocket
*/
public TopologyViewWebSocket(ServiceDirectory directory) {
super(directory);
intentFilter = new TopologyViewIntentFilter(intentService, deviceService,
hostService, linkService);
appId = directory.get(CoreService.class).registerApplication(APP_ID);
......@@ -431,21 +439,13 @@ public class TopologyViewWebSocket
// Subscribes for summary messages.
private synchronized void requestSummary(ObjectNode event) {
if (summaryTask == null) {
summaryEvent = event;
summaryTask = new SummaryMonitor();
timer.schedule(summaryTask, SUMMARY_FREQUENCY_SEC, SUMMARY_FREQUENCY_SEC);
}
summaryEnabled = true;
sendMessage(summmaryMessage(number(event, "sid")));
}
// Cancels sending summary messages.
private synchronized void cancelSummary(ObjectNode event) {
if (summaryTask != null) {
summaryTask.cancel();
summaryTask = null;
summaryEvent = null;
}
summaryEnabled = false;
}
......@@ -457,6 +457,7 @@ public class TopologyViewWebSocket
linkService.addListener(linkListener);
hostService.addListener(hostListener);
intentService.addListener(intentListener);
flowService.addListener(flowListener);
}
// Removes all internal listeners.
......@@ -469,6 +470,7 @@ public class TopologyViewWebSocket
linkService.removeListener(linkListener);
hostService.removeListener(hostListener);
intentService.removeListener(intentListener);
flowService.removeListener(flowListener);
}
}
......@@ -495,6 +497,7 @@ public class TopologyViewWebSocket
@Override
public void event(DeviceEvent event) {
sendMessage(deviceMessage(event));
eventAccummulator.add(event);
}
}
......@@ -503,6 +506,7 @@ public class TopologyViewWebSocket
@Override
public void event(LinkEvent event) {
sendMessage(linkMessage(event));
eventAccummulator.add(event);
}
}
......@@ -511,6 +515,7 @@ public class TopologyViewWebSocket
@Override
public void event(HostEvent event) {
sendMessage(hostMessage(event));
eventAccummulator.add(event);
}
}
......@@ -521,33 +526,55 @@ public class TopologyViewWebSocket
if (trafficEvent != null) {
requestTraffic(trafficEvent);
}
eventAccummulator.add(event);
}
}
// Intent event listener.
private class InternalFlowListener implements FlowRuleListener {
@Override
public void event(FlowRuleEvent event) {
eventAccummulator.add(event);
}
}
private class TrafficMonitor extends TimerTask {
@Override
public void run() {
if (trafficEvent != null) {
String type = string(trafficEvent, "event", "unknown");
if (type.equals("requestAllTraffic")) {
requestAllTraffic(trafficEvent);
} else if (type.equals("requestDeviceLinkFlows")) {
requestDeviceLinkFlows(trafficEvent);
} else {
requestTraffic(trafficEvent);
try {
if (trafficEvent != null) {
String type = string(trafficEvent, "event", "unknown");
if (type.equals("requestAllTraffic")) {
requestAllTraffic(trafficEvent);
} else if (type.equals("requestDeviceLinkFlows")) {
requestDeviceLinkFlows(trafficEvent);
} else {
requestTraffic(trafficEvent);
}
}
} catch (Exception e) {
log.warn("Unable to handle traffic request due to {}", e.getMessage());
}
}
}
private class SummaryMonitor extends TimerTask {
// Accummulates events to drive methodic update of the summary pane.
private class InternalEventAccummulator extends AbstractEventAccumulator {
protected InternalEventAccummulator() {
super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
}
@Override
public void run() {
if (summaryEvent != null) {
requestSummary(summaryEvent);
public void processEvents(List<Event> events) {
try {
if (summaryEnabled) {
sendMessage(summmaryMessage(0));
}
} catch (Exception e) {
log.warn("Unable to handle summary request due to {}", e.getMessage());
}
}
}
}
......
......@@ -356,6 +356,9 @@
hideInstances();
} else if (summaryPane.isVisible()) {
cancelSummary();
stopAntTimer();
} else {
hoverMode = hoverModeFlows;
}
}
......