HIGUCHI Yuta
Committed by Gerrit Code Review

log uncaught Exception

- Use execute instead of submit so that uncaught Exception will
  be dealt and logged by ExecutorService's handler.
- Use component's own logger

Change-Id: I761264aea00748980929b5048e111756776dd2f6
Showing 28 changed files with 53 additions and 50 deletions
......@@ -128,7 +128,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
availableDeviceIdSet = Sets.newConcurrentHashSet();
messageHandlingExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/app/cpman", "message-handlers"));
groupedThreads("onos/app/cpman", "message-handlers", log));
communicationService.addSubscriber(CONTROL_STATS,
SERIALIZER::decode, this::handleRequest, messageHandlingExecutor);
......
......@@ -72,7 +72,7 @@ public class HostMobility {
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.mobility");
eventHandler = newSingleThreadScheduledExecutor(groupedThreads("onos/app-mobility", "event-handler"));
eventHandler = newSingleThreadScheduledExecutor(groupedThreads("onos/app-mobility", "event-handler", log));
hostService.addListener(new InternalHostListener());
log.info("Started with Application ID {}", appId.id());
}
......
......@@ -143,7 +143,7 @@ public class OpenstackNodeManager implements OpenstackNodeService {
};
private final ExecutorService eventExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/openstacknode", "event-handler"));
newSingleThreadScheduledExecutor(groupedThreads("onos/openstacknode", "event-handler", log));
private final DeviceListener deviceListener = new InternalDeviceListener();
......
......@@ -120,7 +120,7 @@ public class IntentSynchronizer implements IntentSynchronizationService,
* @return executor service
*/
protected ExecutorService createExecutor() {
return newSingleThreadExecutor(groupedThreads("onos/" + appId, "sync"));
return newSingleThreadExecutor(groupedThreads("onos/" + appId, "sync", log));
}
@Override
......
......@@ -57,7 +57,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
private final ExecutorService executor =
newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d", log));
@SuppressWarnings("unchecked")
private static final Event KILL_PILL = new AbstractEvent(null, 0) {
......
......@@ -125,7 +125,7 @@ public class FlowRuleManager
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d", log));
protected ExecutorService operationsService =
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d, log"));
Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d", log));
private IdGenerator idGenerator;
......@@ -294,7 +294,7 @@ public class FlowRuleManager
@Override
public void apply(FlowRuleOperations ops) {
checkPermission(FLOWRULE_WRITE);
operationsService.submit(new FlowOperationsProcessor(ops));
operationsService.execute(new FlowOperationsProcessor(ops));
}
@Override
......@@ -623,14 +623,14 @@ public class FlowRuleManager
final FlowRuleBatchOperation b = new FlowRuleBatchOperation(perDeviceBatches.get(deviceId),
deviceId, id);
pendingFlowOperations.put(id, this);
deviceInstallers.submit(() -> store.storeBatch(b));
deviceInstallers.execute(() -> store.storeBatch(b));
}
}
public void satisfy(DeviceId devId) {
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
operationsService.submit(this);
operationsService.execute(this);
}
}
......@@ -640,7 +640,7 @@ public class FlowRuleManager
hasFailed.set(true);
pendingDevices.remove(devId);
if (pendingDevices.isEmpty()) {
operationsService.submit(this);
operationsService.execute(this);
}
if (context != null) {
......
......@@ -134,7 +134,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
@Activate
protected void activate() {
executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d"));
executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d", log));
flowObjectiveStore.setDelegate(delegate);
mastershipService.addListener(mastershipListener);
deviceService.addListener(deviceListener);
......@@ -191,7 +191,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
//Attempts to check if pipeliner is null for retry attempts
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.sleep(INSTALL_RETRY_INTERVAL);
executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
......@@ -208,7 +208,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
@Override
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
checkPermission(FLOWRULE_WRITE);
executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
executorService.execute(new ObjectiveInstaller(deviceId, filteringObjective));
}
@Override
......@@ -217,14 +217,14 @@ public class FlowObjectiveManager implements FlowObjectiveService {
if (queueObjective(deviceId, forwardingObjective)) {
return;
}
executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
executorService.execute(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
public void next(DeviceId deviceId, NextObjective nextObjective) {
checkPermission(FLOWRULE_WRITE);
nextToDevice.put(nextObjective.id(), deviceId);
executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
executorService.execute(new ObjectiveInstaller(deviceId, nextObjective));
}
@Override
......
......@@ -201,7 +201,7 @@ public class FlowObjectiveCompositionManager implements FlowObjectiveService {
}
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.sleep(INSTALL_RETRY_INTERVAL);
executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
executorService.execute(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
......@@ -221,7 +221,7 @@ public class FlowObjectiveCompositionManager implements FlowObjectiveService {
List<FilteringObjective> filteringObjectives
= this.deviceCompositionTreeMap.get(deviceId).updateFilter(filteringObjective);
for (FilteringObjective tmp : filteringObjectives) {
executorService.submit(new ObjectiveInstaller(deviceId, tmp));
executorService.execute(new ObjectiveInstaller(deviceId, tmp));
}
}
......@@ -235,7 +235,7 @@ public class FlowObjectiveCompositionManager implements FlowObjectiveService {
List<ForwardingObjective> forwardingObjectives
= this.deviceCompositionTreeMap.get(deviceId).updateForward(forwardingObjective);
for (ForwardingObjective tmp : forwardingObjectives) {
executorService.submit(new ObjectiveInstaller(deviceId, tmp));
executorService.execute(new ObjectiveInstaller(deviceId, tmp));
}
}
......@@ -245,7 +245,7 @@ public class FlowObjectiveCompositionManager implements FlowObjectiveService {
List<NextObjective> nextObjectives = this.deviceCompositionTreeMap.get(deviceId).updateNext(nextObjective);
for (NextObjective tmp : nextObjectives) {
executorService.submit(new ObjectiveInstaller(deviceId, tmp));
executorService.execute(new ObjectiveInstaller(deviceId, tmp));
}
}
......
......@@ -90,7 +90,7 @@ public class IntentCleanup implements Runnable, IntentListener {
@Activate
public void activate() {
cfgService.registerProperties(getClass());
executor = newSingleThreadExecutor(groupedThreads("onos/intent", "cleanup"));
executor = newSingleThreadExecutor(groupedThreads("onos/intent", "cleanup", log));
timer = new Timer("onos-intent-cleanup-timer");
service.addListener(this);
adjustRate();
......@@ -149,7 +149,7 @@ public class IntentCleanup implements Runnable, IntentListener {
timerTask = new TimerTask() {
@Override
public void run() {
executor.submit(IntentCleanup.this);
executor.execute(IntentCleanup.this);
}
};
......
......@@ -149,8 +149,8 @@ public class IntentManager
}
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
batchExecutor = newSingleThreadExecutor(groupedThreads("onos/intent", "batch"));
workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d"));
batchExecutor = newSingleThreadExecutor(groupedThreads("onos/intent", "batch", log));
workerExecutor = newFixedThreadPool(numThreads, groupedThreads("onos/intent", "worker-%d", log));
idGenerator = coreService.getIdGenerator("intent-ids");
Intent.bindIdGenerator(idGenerator);
log.info("Started");
......
......@@ -116,7 +116,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
protected IntentPartitionService partitionService;
private ExecutorService executorService =
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker"));
newSingleThreadExecutor(groupedThreads("onos/intent", "objectivetracker", log));
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
......
......@@ -120,7 +120,7 @@ public class PacketManager
@Activate
public void activate() {
eventHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/net/packet", "event-handler"));
groupedThreads("onos/net/packet", "event-handler", log));
localNodeId = clusterService.getLocalNode().id();
appId = coreService.getAppId(CoreService.CORE_APP_NAME);
store.setDelegate(delegate);
......
......@@ -124,7 +124,7 @@ public class DefaultTopologyProvider extends AbstractProvider
@Activate
public synchronized void activate(ComponentContext context) {
cfgService.registerProperties(DefaultTopologyProvider.class);
executor = newFixedThreadPool(MAX_THREADS, groupedThreads("onos/topo", "build-%d"));
executor = newFixedThreadPool(MAX_THREADS, groupedThreads("onos/topo", "build-%d", log));
accumulator = new TopologyChangeAccumulator();
logConfig("Configured");
......
......@@ -107,9 +107,9 @@ public class DistributedClusterStore
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
groupedThreads("onos/cluster/membership", "heartbeat-sender", log));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-receiver"));
groupedThreads("onos/cluster/membership", "heartbeat-receiver", log));
private PhiAccrualFailureDetector failureDetector;
......@@ -377,7 +377,7 @@ public class DistributedClusterStore
try {
ScheduledExecutorService prevSender = heartBeatSender;
heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender-%d"));
groupedThreads("onos/cluster/membership", "heartbeat-sender-%d", log));
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
heartbeatInterval, TimeUnit.MILLISECONDS);
prevSender.shutdown();
......
......@@ -167,7 +167,7 @@ public class NewDistributedFlowRuleStore
private ScheduledFuture<?> backupTask;
private final ScheduledExecutorService backupSenderExecutor =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender"));
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/flow", "backup-sender", log));
private EventuallyConsistentMap<DeviceId, List<TableStatisticsEntry>> deviceTableStats;
private final EventuallyConsistentMapListener<DeviceId, List<TableStatisticsEntry>> tableStatsListener =
......@@ -203,7 +203,7 @@ public class NewDistributedFlowRuleStore
local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log));
registerMessageHandlers(messageHandlingExecutor);
......
......@@ -185,7 +185,8 @@ public class DistributedGroupStore
messageHandlingExecutor = Executors.
newFixedThreadPool(MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/group",
"message-handlers"));
"message-handlers",
log));
clusterCommunicator.addSubscriber(GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
clusterMsgSerializer::deserialize,
......
......@@ -118,7 +118,7 @@ public class DistributedPacketStore
public void activate() {
messageHandlingExecutor = Executors.newFixedThreadPool(
messageHandlerThreadPoolSize,
groupedThreads("onos/store/packet", "message-handlers"));
groupedThreads("onos/store/packet", "message-handlers", log));
communicationService.<OutboundPacket>addSubscriber(PACKET_OUT_SUBJECT,
SERIALIZER::decode,
......
......@@ -76,7 +76,7 @@ public class DistributedProxyArpStore implements ProxyArpStore {
private Map<HostId, ArpResponseMessage> pendingMessages = Maps.newConcurrentMap();
private ExecutorService executor =
newFixedThreadPool(4, groupedThreads("onos/arp", "sender-%d"));
newFixedThreadPool(4, groupedThreads("onos/arp", "sender-%d", log));
private NodeId localNodeId;
......
......@@ -120,7 +120,7 @@ public class DistributedStatisticStore implements StatisticStore {
messageHandlingExecutor = Executors.newFixedThreadPool(
messageHandlerThreadPoolSize,
groupedThreads("onos/store/statistic", "message-handlers"));
groupedThreads("onos/store/statistic", "message-handlers", log));
clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
SERIALIZER::decode,
......
......@@ -218,7 +218,7 @@ public class EventuallyConsistentMapImpl<K, V>
} else {
// should be a normal executor; it's used for receiving messages
this.executor =
Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d", log));
}
if (communicationExecutor != null) {
......@@ -227,7 +227,7 @@ public class EventuallyConsistentMapImpl<K, V>
// sending executor; should be capped
//TODO this probably doesn't need to be bounded anymore
this.communicationExecutor =
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d", log));
}
......@@ -235,7 +235,7 @@ public class EventuallyConsistentMapImpl<K, V>
this.backgroundExecutor = backgroundExecutor;
} else {
this.backgroundExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d", log));
}
// start anti-entropy thread
......@@ -718,7 +718,7 @@ public class EventuallyConsistentMapImpl<K, V>
Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
items.forEach(item -> map.compute(item.key(), (key, existing) ->
item.isNewerThan(existing) ? item : existing));
communicationExecutor.submit(() -> {
communicationExecutor.execute(() -> {
clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
updateMessageSubject,
serializer::encode,
......
......@@ -140,7 +140,7 @@ public class CentecV350Pipeline extends AbstractHandlerBehaviour implements Pipe
private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
"centec-V350-%d"));
"centec-V350-%d", log));
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
......
......@@ -124,7 +124,8 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
"ovs-corsa-%d"));
"ovs-corsa-%d",
log));
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
......
......@@ -117,7 +117,7 @@ public class Ofdpa2GroupHandler {
private Cache<GroupKey, List<OfdpaNextGroup>> pendingNextObjectives;
private ConcurrentHashMap<GroupKey, Set<GroupChainElem>> pendingGroups;
private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", "ofdpa2-%d"));
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner", "ofdpa2-%d", log));
// index number for group creation
private AtomicCounter nextIndex;
......
......@@ -132,7 +132,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
private ScheduledExecutorService groupChecker = Executors
.newScheduledThreadPool(2,
groupedThreads("onos/pipeliner",
"spring-open-%d"));
"spring-open-%d",
log));
protected KryoNamespace appKryo = new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(GroupKey.class)
......
......@@ -151,13 +151,13 @@ public class Controller {
if (workerThreads == 0) {
execFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d")),
Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d")));
Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d", log)),
Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d", log)));
return new ServerBootstrap(execFactory);
} else {
execFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d")),
Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d")), workerThreads);
Executors.newCachedThreadPool(groupedThreads("onos/of", "boss-%d", log)),
Executors.newCachedThreadPool(groupedThreads("onos/of", "worker-%d", log)), workerThreads);
return new ServerBootstrap(execFactory);
}
}
......
......@@ -138,7 +138,7 @@ public class HostLocationProvider extends AbstractProvider implements HostProvid
cfgService.registerProperties(getClass());
appId = coreService.registerApplication("org.onosproject.provider.host");
eventHandler = newSingleThreadScheduledExecutor(
groupedThreads("onos/host-loc-provider", "event-handler"));
groupedThreads("onos/host-loc-provider", "event-handler", log));
providerService = providerRegistry.register(this);
packetService.addProcessor(processor, PacketProcessor.advisor(1));
deviceService.addListener(deviceListener);
......
......@@ -338,7 +338,7 @@ public class LldpLinkProvider extends AbstractProvider implements LinkProvider {
loadDevices();
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d"));
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d", log));
executor.scheduleAtFixedRate(new SyncDeviceInfoTask(),
DEVICE_SYNC_DELAY, DEVICE_SYNC_DELAY, SECONDS);
executor.scheduleAtFixedRate(new LinkPrunerTask(),
......
......@@ -78,7 +78,7 @@ public class NewAdaptiveFlowStatsCollector {
private final OpenFlowSwitch sw;
private ScheduledExecutorService adaptiveFlowStatsScheduler =
Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d", log));
private ScheduledFuture<?> calAndShortFlowsThread;
private ScheduledFuture<?> midFlowsThread;
private ScheduledFuture<?> longFlowsThread;
......