Thomas Vachuska
Committed by Gerrit Code Review

Made time limit for event processing configurable; cleaned-up duplicate code.

Change-Id: I08e7f1c9f4cdbd6404f1eb5e3544989e7a728d92
......@@ -20,4 +20,19 @@ package org.onosproject.event;
* then dispatching them to the appropriate event sink.
*/
public interface EventDeliveryService extends EventDispatcher, EventSinkRegistry {
/**
* Sets the number of millis that an event sink has to process an event.
*
* @param millis number of millis allowed per sink per event
*/
void setDispatchTimeLimit(long millis);
/**
* Returns the number of millis that an event sink has to process an event.
*
* @return number of millis allowed per sink per event
*/
long getDispatchTimeLimit();
}
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.event.impl;
package org.onosproject.common.event.impl;
import org.onosproject.event.DefaultEventSinkRegistry;
import org.onosproject.event.Event;
......@@ -37,4 +37,12 @@ public class TestEventDispatcher extends DefaultEventSinkRegistry
sink.process(event);
}
@Override
public void setDispatchTimeLimit(long millis) {
}
@Override
public long getDispatchTimeLimit() {
return 0;
}
}
......
......@@ -17,12 +17,12 @@ package org.onosproject.core.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
......@@ -32,9 +32,11 @@ import org.onosproject.core.CoreService;
import org.onosproject.core.IdBlockStore;
import org.onosproject.core.IdGenerator;
import org.onosproject.core.Version;
import org.onosproject.event.EventDeliveryService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Dictionary;
import java.util.List;
......@@ -64,9 +66,18 @@ public class CoreManager implements CoreService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
@Property(name = "sharedThreadPoolSize", intValue = 30,
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDeliveryService;
private static final int DEFAULT_POOL_SIZE = 30;
@Property(name = "sharedThreadPoolSize", intValue = DEFAULT_POOL_SIZE,
label = "Configure shared pool maximum size ")
private int sharedThreadPoolSize = 30;
private int sharedThreadPoolSize = DEFAULT_POOL_SIZE;
private static final int DEFAULT_EVENT_TIME = 2000;
@Property(name = "maxEventTimeLimit", intValue = DEFAULT_EVENT_TIME,
label = "Maximum number of millis an event sink has to process an event")
private int maxEventTimeLimit = DEFAULT_EVENT_TIME;
@Activate
public void activate() {
......@@ -121,30 +132,33 @@ public class CoreManager implements CoreService {
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
Integer sharedThreadPoolSizeConfig =
getIntegerProperty(properties, "sharedThreadPoolSize");
if (sharedThreadPoolSizeConfig == null) {
log.info("Shared Pool Size is not configured, default value is {}",
sharedThreadPoolSize);
} else {
if (sharedThreadPoolSizeConfig > 0) {
sharedThreadPoolSize = sharedThreadPoolSizeConfig;
SharedExecutors.setPoolSize(sharedThreadPoolSize);
log.info("Configured. Shared Pool Size is configured to {}",
sharedThreadPoolSize);
} else {
log.warn("Shared Pool Size size must be greater than 0");
}
Integer poolSize = getIntegerProperty(properties, "sharedThreadPoolSize");
if (poolSize != null && poolSize > 1) {
sharedThreadPoolSize = poolSize;
SharedExecutors.setPoolSize(sharedThreadPoolSize);
} else if (poolSize != null) {
log.warn("sharedThreadPoolSize must be greater than 1");
}
}
Integer timeLimit = getIntegerProperty(properties, "maxEventTimeLimit");
if (timeLimit != null && timeLimit > 1) {
maxEventTimeLimit = timeLimit;
eventDeliveryService.setDispatchTimeLimit(maxEventTimeLimit);
} else if (timeLimit != null) {
log.warn("maxEventTimeLimit must be greater than 1");
}
log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}",
sharedThreadPoolSize, maxEventTimeLimit);
}
/**
* Get Integer property from the propertyName
* Return null if propertyName is not found.
*
* @param properties properties to be looked up
* @param properties properties to be looked up
* @param propertyName the name of the property to look up
* @return value when the propertyName is defined or return null
*/
......
......@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -45,12 +46,12 @@ import static org.slf4j.LoggerFactory.getLogger;
public class CoreEventDispatcher extends DefaultEventSinkRegistry
implements EventDeliveryService {
// Maximum number of millis a sink can take to process an event.
private static final long MAX_EXECUTE_MS = 1_000;
private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
private final Logger log = getLogger(getClass());
// Default number of millis a sink can take to process an event.
private static final long DEFAULT_EXECUTE_MS = 2_000; // ms
private static final long WATCHDOG_MS = 250; // ms
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
private final ExecutorService executor =
......@@ -61,6 +62,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
};
private DispatchLoop dispatchLoop;
private long maxProcessMillis = DEFAULT_EXECUTE_MS;
// Means to detect long-running sinks
private TimerTask watchdog;
......@@ -92,6 +94,18 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
log.info("Stopped");
}
@Override
public void setDispatchTimeLimit(long millis) {
checkArgument(millis >= WATCHDOG_MS,
"Time limit must be greater than %s", WATCHDOG_MS);
maxProcessMillis = millis;
}
@Override
public long getDispatchTimeLimit() {
return maxProcessMillis;
}
// Auxiliary event dispatching loop that feeds off the events queue.
private class DispatchLoop implements Runnable {
private volatile boolean stopped;
......@@ -126,7 +140,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
lastStart = 0;
} else {
log.warn("No sink registered for event class {}",
event.getClass());
event.getClass().getName());
}
}
......@@ -140,7 +154,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
@Override
public void run() {
long delta = System.currentTimeMillis() - lastStart;
if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
if (lastStart > 0 && delta > maxProcessMillis) {
log.error("Event sink {} exceeded execution time limit: {} ms",
lastSink.getClass().getName(), delta);
......
......@@ -28,7 +28,7 @@ import org.onosproject.core.Application;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplication;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import java.io.InputStream;
import java.net.URI;
......
......@@ -26,7 +26,7 @@ import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.mastership.MastershipService;
import org.onosproject.mastership.MastershipStore;
import org.onosproject.mastership.MastershipTermService;
......
......@@ -31,7 +31,7 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Event;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.mastership.MastershipTermService;
......
......@@ -29,7 +29,7 @@ import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.DefaultDevice;
import org.onosproject.net.Device;
import org.onosproject.net.Device.Type;
......
......@@ -35,7 +35,7 @@ import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.core.DefaultGroupId;
import org.onosproject.core.GroupId;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.impl.DeviceManager;
......
......@@ -36,7 +36,7 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.event.Event;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
......
......@@ -30,7 +30,7 @@ import org.onosproject.TestApplicationId;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.impl.TestCoreManager;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.NetworkResource;
import org.onosproject.net.intent.FlowRuleIntent;
import org.onosproject.net.intent.Intent;
......
......@@ -37,7 +37,7 @@ import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.device.impl.DeviceManager;
import org.onosproject.store.trivial.impl.SimpleLinkStore;
......
......@@ -22,7 +22,7 @@ import org.junit.Before;
import org.junit.Test;
import org.onosproject.cfg.ComponentConfigAdapter;
import org.onosproject.event.Event;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.Device;
import org.onosproject.net.Link;
import org.onosproject.net.device.DeviceEvent;
......
......@@ -19,7 +19,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.event.Event;
import org.onosproject.event.impl.TestEventDispatcher;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.Link;
......
......@@ -22,10 +22,7 @@ import org.junit.Before;
import org.junit.Test;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
import org.onosproject.event.DefaultEventSinkRegistry;
import org.onosproject.event.Event;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventSink;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipEvent.Type;
......@@ -44,7 +41,6 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -168,21 +164,4 @@ public class ReplicaInfoManagerTest {
}
}
// code clone
/**
* Implements event delivery system that delivers events synchronously, or
* in-line with the post method invocation.
*/
private static class TestEventDispatcher extends DefaultEventSinkRegistry
implements EventDeliveryService {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void post(Event event) {
EventSink sink = getSink(event.getClass());
checkState(sink != null, "No sink for event %s", event);
sink.process(event);
}
}
}
......