Committed by
Ray Milkey
Fixed javadoc warning; added shared executor/timer wrappers to prevent inadverte…
…nt shutdown; added shutdown to CoreManager.deactivate. Change-Id: I27f31b5d41050d6d87cd6419ab863201c4585843
Showing
4 changed files
with
190 additions
and
29 deletions
... | @@ -64,9 +64,9 @@ public class CoreManager implements CoreService { | ... | @@ -64,9 +64,9 @@ public class CoreManager implements CoreService { |
64 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 64 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
65 | protected ComponentConfigService cfgService; | 65 | protected ComponentConfigService cfgService; |
66 | 66 | ||
67 | - @Property(name = "sharedThreadPoolSize", intValue = SharedExecutors.DEFAULT_THREAD_SIZE, | 67 | + @Property(name = "sharedThreadPoolSize", intValue = SharedExecutors.DEFAULT_POOL_SIZE, |
68 | label = "Configure shared pool maximum size ") | 68 | label = "Configure shared pool maximum size ") |
69 | - private int sharedThreadPoolSize = SharedExecutors.DEFAULT_THREAD_SIZE; | 69 | + private int sharedThreadPoolSize = SharedExecutors.DEFAULT_POOL_SIZE; |
70 | 70 | ||
71 | @Activate | 71 | @Activate |
72 | public void activate() { | 72 | public void activate() { |
... | @@ -80,6 +80,7 @@ public class CoreManager implements CoreService { | ... | @@ -80,6 +80,7 @@ public class CoreManager implements CoreService { |
80 | @Deactivate | 80 | @Deactivate |
81 | public void deactivate() { | 81 | public void deactivate() { |
82 | cfgService.unregisterProperties(getClass(), false); | 82 | cfgService.unregisterProperties(getClass(), false); |
83 | + SharedExecutors.shutdown(); | ||
83 | } | 84 | } |
84 | 85 | ||
85 | @Override | 86 | @Override | ... | ... |
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onlab.util; | ||
17 | + | ||
18 | +import java.util.Collection; | ||
19 | +import java.util.List; | ||
20 | +import java.util.concurrent.Callable; | ||
21 | +import java.util.concurrent.ExecutionException; | ||
22 | +import java.util.concurrent.ExecutorService; | ||
23 | +import java.util.concurrent.Future; | ||
24 | +import java.util.concurrent.TimeUnit; | ||
25 | +import java.util.concurrent.TimeoutException; | ||
26 | + | ||
27 | +/** | ||
28 | + * Executor service wrapper for shared executors with safeguards on shutdown | ||
29 | + * to prevent inadvertent shutdown. | ||
30 | + */ | ||
31 | +class SharedExecutorService implements ExecutorService { | ||
32 | + | ||
33 | + private static final String NOT_ALLOWED = "Shutdown of shared executor is not allowed"; | ||
34 | + | ||
35 | + private ExecutorService executor; | ||
36 | + | ||
37 | + /** | ||
38 | + * Creates a wrapper for the given executor service. | ||
39 | + * | ||
40 | + * @param executor executor service to wrap | ||
41 | + */ | ||
42 | + SharedExecutorService(ExecutorService executor) { | ||
43 | + this.executor = executor; | ||
44 | + } | ||
45 | + | ||
46 | + /** | ||
47 | + * Returns the backing executor service. | ||
48 | + * | ||
49 | + * @return backing executor service | ||
50 | + */ | ||
51 | + ExecutorService backingExecutor() { | ||
52 | + return executor; | ||
53 | + } | ||
54 | + | ||
55 | + /** | ||
56 | + * Swaps the backing executor with a new one and shuts down the old one. | ||
57 | + * | ||
58 | + * @param executor new executor service | ||
59 | + */ | ||
60 | + void setBackingExecutor(ExecutorService executor) { | ||
61 | + ExecutorService oldExecutor = this.executor; | ||
62 | + this.executor = executor; | ||
63 | + oldExecutor.shutdown(); | ||
64 | + } | ||
65 | + | ||
66 | + @Override | ||
67 | + public void shutdown() { | ||
68 | + throw new UnsupportedOperationException(NOT_ALLOWED); | ||
69 | + } | ||
70 | + | ||
71 | + @Override | ||
72 | + public List<Runnable> shutdownNow() { | ||
73 | + throw new UnsupportedOperationException(NOT_ALLOWED); | ||
74 | + } | ||
75 | + | ||
76 | + @Override | ||
77 | + public boolean isShutdown() { | ||
78 | + return executor.isShutdown(); | ||
79 | + } | ||
80 | + | ||
81 | + @Override | ||
82 | + public boolean isTerminated() { | ||
83 | + return executor.isTerminated(); | ||
84 | + } | ||
85 | + | ||
86 | + @Override | ||
87 | + public boolean awaitTermination(long timeout, TimeUnit unit) | ||
88 | + throws InterruptedException { | ||
89 | + return executor.awaitTermination(timeout, unit); | ||
90 | + } | ||
91 | + | ||
92 | + @Override | ||
93 | + public <T> Future<T> submit(Callable<T> task) { | ||
94 | + return executor.submit(task); | ||
95 | + } | ||
96 | + | ||
97 | + @Override | ||
98 | + public <T> Future<T> submit(Runnable task, T result) { | ||
99 | + return executor.submit(task, result); | ||
100 | + } | ||
101 | + | ||
102 | + @Override | ||
103 | + public Future<?> submit(Runnable task) { | ||
104 | + return executor.submit(task); | ||
105 | + } | ||
106 | + | ||
107 | + @Override | ||
108 | + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) | ||
109 | + throws InterruptedException { | ||
110 | + return executor.invokeAll(tasks); | ||
111 | + } | ||
112 | + | ||
113 | + @Override | ||
114 | + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, | ||
115 | + long timeout, TimeUnit unit) | ||
116 | + throws InterruptedException { | ||
117 | + return executor.invokeAll(tasks, timeout, unit); | ||
118 | + } | ||
119 | + | ||
120 | + @Override | ||
121 | + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) | ||
122 | + throws InterruptedException, ExecutionException { | ||
123 | + return executor.invokeAny(tasks); | ||
124 | + } | ||
125 | + | ||
126 | + @Override | ||
127 | + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, | ||
128 | + long timeout, TimeUnit unit) | ||
129 | + throws InterruptedException, ExecutionException, TimeoutException { | ||
130 | + return invokeAny(tasks, timeout, unit); | ||
131 | + } | ||
132 | + | ||
133 | + @Override | ||
134 | + public void execute(Runnable command) { | ||
135 | + executor.execute(command); | ||
136 | + } | ||
137 | + | ||
138 | +} |
... | @@ -18,9 +18,8 @@ package org.onlab.util; | ... | @@ -18,9 +18,8 @@ package org.onlab.util; |
18 | 18 | ||
19 | import java.util.Timer; | 19 | import java.util.Timer; |
20 | import java.util.concurrent.ExecutorService; | 20 | import java.util.concurrent.ExecutorService; |
21 | -import org.slf4j.Logger; | ||
22 | -import org.slf4j.LoggerFactory; | ||
23 | 21 | ||
22 | +import static com.google.common.base.Preconditions.checkArgument; | ||
24 | import static java.util.concurrent.Executors.newFixedThreadPool; | 23 | import static java.util.concurrent.Executors.newFixedThreadPool; |
25 | import static java.util.concurrent.Executors.newSingleThreadExecutor; | 24 | import static java.util.concurrent.Executors.newSingleThreadExecutor; |
26 | import static org.onlab.util.Tools.groupedThreads; | 25 | import static org.onlab.util.Tools.groupedThreads; |
... | @@ -36,21 +35,20 @@ import static org.onlab.util.Tools.groupedThreads; | ... | @@ -36,21 +35,20 @@ import static org.onlab.util.Tools.groupedThreads; |
36 | */ | 35 | */ |
37 | public final class SharedExecutors { | 36 | public final class SharedExecutors { |
38 | 37 | ||
39 | - private static final Logger log = LoggerFactory.getLogger(SharedExecutors.class); | 38 | + public static final int DEFAULT_POOL_SIZE = 30; |
40 | 39 | ||
41 | - // TODO: make this configurable via setPoolSize static method | 40 | + private static SharedExecutorService singleThreadExecutor = |
42 | - public static final int DEFAULT_THREAD_SIZE = 30; | 41 | + new SharedExecutorService( |
43 | - private static int poolSize = DEFAULT_THREAD_SIZE; | 42 | + newSingleThreadExecutor(groupedThreads("onos/shared", |
43 | + "onos-single-executor"))); | ||
44 | 44 | ||
45 | - private static ExecutorService singleThreadExecutor = | 45 | + private static SharedExecutorService poolThreadExecutor = |
46 | - newSingleThreadExecutor(groupedThreads("onos/shared", | 46 | + new SharedExecutorService( |
47 | - "onos-single-executor")); | 47 | + newFixedThreadPool(DEFAULT_POOL_SIZE, |
48 | + groupedThreads("onos/shared", | ||
49 | + "onos-pool-executor-%d"))); | ||
48 | 50 | ||
49 | - private static ExecutorService poolThreadExecutor = | 51 | + private static SharedTimer sharedTimer = new SharedTimer(); |
50 | - newFixedThreadPool(poolSize, groupedThreads("onos/shared", | ||
51 | - "onos-pool-executor-%d")); | ||
52 | - | ||
53 | - private static Timer sharedTimer = new Timer("onos-shared-timer"); | ||
54 | 52 | ||
55 | // Ban public construction | 53 | // Ban public construction |
56 | private SharedExecutors() { | 54 | private SharedExecutors() { |
... | @@ -85,17 +83,41 @@ public final class SharedExecutors { | ... | @@ -85,17 +83,41 @@ public final class SharedExecutors { |
85 | 83 | ||
86 | /** | 84 | /** |
87 | * Sets the shared thread pool size. | 85 | * Sets the shared thread pool size. |
88 | - * @param poolSize | 86 | + * |
87 | + * @param poolSize new pool size | ||
89 | */ | 88 | */ |
90 | public static void setPoolSize(int poolSize) { | 89 | public static void setPoolSize(int poolSize) { |
91 | - if (poolSize > 0) { | 90 | + checkArgument(poolSize > 0, "Shared pool size size must be greater than 0"); |
92 | - SharedExecutors.poolSize = poolSize; | 91 | + poolThreadExecutor.setBackingExecutor( |
93 | - //TODO: wait for execution previous task in the queue . | 92 | + newFixedThreadPool(poolSize, groupedThreads("onos/shared", |
94 | - poolThreadExecutor.shutdown(); | 93 | + "onos-pool-executor-%d"))); |
95 | - poolThreadExecutor = newFixedThreadPool(poolSize, groupedThreads("onos/shared", | 94 | + } |
96 | - "onos-pool-executor-%d")); | 95 | + |
97 | - } else { | 96 | + /** |
98 | - log.warn("Shared Pool Size size must be greater than 0"); | 97 | + * Shuts down all shared timers and executors and therefore should be |
98 | + * called only by the framework. | ||
99 | + */ | ||
100 | + public static void shutdown() { | ||
101 | + sharedTimer.shutdown(); | ||
102 | + singleThreadExecutor.backingExecutor().shutdown(); | ||
103 | + poolThreadExecutor.backingExecutor().shutdown(); | ||
104 | + } | ||
105 | + | ||
106 | + // Timer extension which does not allow outside cancel method. | ||
107 | + private static class SharedTimer extends Timer { | ||
108 | + | ||
109 | + public SharedTimer() { | ||
110 | + super("onos-shared-timer"); | ||
111 | + } | ||
112 | + | ||
113 | + @Override | ||
114 | + public void cancel() { | ||
115 | + throw new UnsupportedOperationException("Cancel of shared timer is not allowed"); | ||
116 | + } | ||
117 | + | ||
118 | + private void shutdown() { | ||
119 | + super.cancel(); | ||
99 | } | 120 | } |
100 | } | 121 | } |
122 | + | ||
101 | } | 123 | } | ... | ... |
... | @@ -58,13 +58,13 @@ public final class SlidingWindowCounter { | ... | @@ -58,13 +58,13 @@ public final class SlidingWindowCounter { |
58 | 58 | ||
59 | // Initialize each item in the list to an AtomicLong of 0 | 59 | // Initialize each item in the list to an AtomicLong of 0 |
60 | this.counters = Collections.nCopies(windowSlots, 0) | 60 | this.counters = Collections.nCopies(windowSlots, 0) |
61 | - .stream() | 61 | + .stream() |
62 | - .map(AtomicLong::new) | 62 | + .map(AtomicLong::new) |
63 | - .collect(Collectors.toCollection(ArrayList::new)); | 63 | + .collect(Collectors.toCollection(ArrayList::new)); |
64 | 64 | ||
65 | background = Executors.newSingleThreadScheduledExecutor(); | 65 | background = Executors.newSingleThreadScheduledExecutor(); |
66 | background.scheduleWithFixedDelay(this::advanceHead, 0, | 66 | background.scheduleWithFixedDelay(this::advanceHead, 0, |
67 | - SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS); | 67 | + SLIDE_WINDOW_PERIOD_SECONDS, TimeUnit.SECONDS); |
68 | } | 68 | } |
69 | 69 | ||
70 | /** | 70 | /** | ... | ... |
-
Please register or login to post a comment