Yuta HIGUCHI

ExecutorService with somewhat predictable thread assignment.

- ExecutorService which allows the caller or the Task to
  express hint about which Thread it needs to be executed.

Change-Id: If1cc58f6b2369bb5afce4f402c195eacedf67f05
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.util;
17 +
18 +import static com.google.common.base.Preconditions.checkArgument;
19 +import static com.google.common.base.Preconditions.checkNotNull;
20 +
21 +import java.time.Duration;
22 +import java.time.Instant;
23 +import java.time.temporal.ChronoUnit;
24 +import java.util.ArrayList;
25 +import java.util.List;
26 +import java.util.Objects;
27 +import java.util.concurrent.AbstractExecutorService;
28 +import java.util.concurrent.Callable;
29 +import java.util.concurrent.ExecutorService;
30 +import java.util.concurrent.Executors;
31 +import java.util.concurrent.FutureTask;
32 +import java.util.concurrent.ThreadFactory;
33 +import java.util.concurrent.TimeUnit;
34 +import java.util.function.Function;
35 +import java.util.stream.Collectors;
36 +
37 +/**
38 + * (Somewhat) predictable ExecutorService.
39 + * <p>
40 + * ExecutorService which behaves similar to the one created by
41 + * {@link Executors#newFixedThreadPool(int, ThreadFactory)},
42 + * but assigns command to specific thread based on
43 + * it's {@link PickyTask#hint()}, {@link Object#hashCode()}, or hint value explicitly
44 + * specified when the command was passed to this {@link PredictableExecutor}.
45 + */
46 +public class PredictableExecutor
47 + extends AbstractExecutorService
48 + implements ExecutorService {
49 +
50 + private final List<ExecutorService> backends;
51 +
52 + /**
53 + * Creates {@link PredictableExecutor} instance.
54 + *
55 + * @param buckets number of buckets or 0 to match available processors
56 + * @param threadFactory {@link ThreadFactory} to use to create threads
57 + * @return {@link PredictableExecutor}
58 + */
59 + public static PredictableExecutor newPredictableExecutor(int buckets, ThreadFactory threadFactory) {
60 + return new PredictableExecutor(buckets, threadFactory);
61 + }
62 +
63 + /**
64 + * Creates {@link PredictableExecutor} instance.
65 + *
66 + * @param buckets number of buckets or 0 to match available processors
67 + * @param threadFactory {@link ThreadFactory} to use to create threads
68 + */
69 + public PredictableExecutor(int buckets, ThreadFactory threadFactory) {
70 + checkArgument(buckets >= 0, "number of buckets must be non zero");
71 + checkNotNull(threadFactory);
72 + if (buckets == 0) {
73 + buckets = Runtime.getRuntime().availableProcessors();
74 + }
75 + this.backends = new ArrayList<>(buckets);
76 +
77 + for (int i = 0; i < buckets; ++i) {
78 + this.backends.add(backendExecutorService(threadFactory));
79 + }
80 + }
81 +
82 + /**
83 + * Creates {@link PredictableExecutor} instance with
84 + * bucket size set to number of available processors.
85 + *
86 + * @param threadFactory {@link ThreadFactory} to use to create threads
87 + */
88 + public PredictableExecutor(ThreadFactory threadFactory) {
89 + this(0, threadFactory);
90 + }
91 +
92 + /**
93 + * Creates a single thread {@link ExecutorService} to use in the backend.
94 + *
95 + * @param threadFactory {@link ThreadFactory} to use to create threads
96 + * @return single thread {@link ExecutorService}
97 + */
98 + protected ExecutorService backendExecutorService(ThreadFactory threadFactory) {
99 + return Executors.newSingleThreadExecutor(threadFactory);
100 + }
101 +
102 +
103 + /**
104 + * Executes given command at some time in the future.
105 + *
106 + * @param command the {@link Runnable} task
107 + * @param hint value to pick thread to run on.
108 + */
109 + public void execute(Runnable command, int hint) {
110 + int index = Math.abs(hint) % backends.size();
111 + backends.get(index).execute(command);
112 + }
113 +
114 + /**
115 + * Executes given command at some time in the future.
116 + *
117 + * @param command the {@link Runnable} task
118 + * @param hintFunction Function to compute hint value
119 + */
120 + public void execute(Runnable command, Function<Runnable, Integer> hintFunction) {
121 + execute(command, hintFunction.apply(command));
122 + }
123 +
124 +
125 + private static int hint(Runnable command) {
126 + if (command instanceof PickyTask) {
127 + return ((PickyTask) command).hint();
128 + } else {
129 + return Objects.hashCode(command);
130 + }
131 + }
132 +
133 + @Override
134 + public void execute(Runnable command) {
135 + execute(command, PredictableExecutor::hint);
136 + }
137 +
138 + @Override
139 + public void shutdown() {
140 + backends.stream().forEach(ExecutorService::shutdown);
141 + }
142 +
143 + @Override
144 + public List<Runnable> shutdownNow() {
145 + return backends.stream()
146 + .map(ExecutorService::shutdownNow)
147 + .flatMap(List::stream)
148 + .collect(Collectors.toList());
149 + }
150 +
151 + @Override
152 + public boolean isShutdown() {
153 + return backends.stream().allMatch(ExecutorService::isShutdown);
154 + }
155 +
156 + @Override
157 + public boolean isTerminated() {
158 + return backends.stream().allMatch(ExecutorService::isTerminated);
159 + }
160 +
161 + /**
162 + * {@inheritDoc}
163 + * <p>
164 + * Note: It'll try, but is not assured that the method will return by specified timeout.
165 + */
166 + @Override
167 + public boolean awaitTermination(long timeout, TimeUnit unit)
168 + throws InterruptedException {
169 +
170 + final Duration timeoutD = Duration.of(unit.toMillis(timeout), ChronoUnit.MILLIS);
171 + final Instant start = Instant.now();
172 +
173 + return backends.parallelStream()
174 + .filter(es -> !es.isTerminated())
175 + .map(es -> {
176 + long timeoutMs = timeoutD.minus(Duration.between(Instant.now(), start)).toMillis();
177 + try {
178 + return es.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
179 + } catch (InterruptedException e) {
180 + Thread.currentThread().interrupt();
181 + return false;
182 + }
183 + })
184 + .allMatch(result -> result);
185 + }
186 +
187 + @Override
188 + protected <T> PickyFutureTask<T> newTaskFor(Callable<T> callable) {
189 + return new PickyFutureTask<>(callable);
190 + }
191 +
192 + @Override
193 + protected <T> PickyFutureTask<T> newTaskFor(Runnable runnable, T value) {
194 + return new PickyFutureTask<>(runnable, value);
195 + }
196 +
197 + /**
198 + * {@link Runnable} also implementing {@link PickyTask}.
199 + */
200 + public static interface PickyRunnable extends PickyTask, Runnable { }
201 +
202 + /**
203 + * {@link Callable} also implementing {@link PickyTask}.
204 + *
205 + * @param <T> result type
206 + */
207 + public static interface PickyCallable<T> extends PickyTask, Callable<T> { }
208 +
209 + /**
210 + * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
211 + *
212 + * @param runnable {@link Runnable}
213 + * @param hint hint value
214 + * @return {@link PickyRunnable}
215 + */
216 + public static PickyRunnable picky(Runnable runnable, int hint) {
217 + return picky(runnable, (r) -> hint);
218 + }
219 +
220 + /**
221 + * Wraps the given {@link Runnable} into {@link PickyRunnable} returning supplied hint.
222 + *
223 + * @param runnable {@link Runnable}
224 + * @param hint hint function
225 + * @return {@link PickyRunnable}
226 + */
227 + public static PickyRunnable picky(Runnable runnable, Function<Runnable, Integer> hint) {
228 + checkNotNull(runnable);
229 + checkNotNull(hint);
230 + return new PickyRunnable() {
231 +
232 + @Override
233 + public void run() {
234 + runnable.run();
235 + }
236 +
237 + @Override
238 + public int hint() {
239 + return hint.apply(runnable);
240 + }
241 + };
242 + }
243 +
244 + /**
245 + * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
246 + *
247 + * @param callable {@link Callable}
248 + * @param hint hint value
249 + * @return {@link PickyCallable}
250 + */
251 + public static <T> PickyCallable<T> picky(Callable<T> callable, int hint) {
252 + return picky(callable, (c) -> hint);
253 + }
254 +
255 + /**
256 + * Wraps the given {@link Callable} into {@link PickyCallable} returning supplied hint.
257 + *
258 + * @param callable {@link Callable}
259 + * @param hint hint function
260 + * @return {@link PickyCallable}
261 + */
262 + public static <T> PickyCallable<T> picky(Callable<T> callable, Function<Callable<T>, Integer> hint) {
263 + checkNotNull(callable);
264 + checkNotNull(hint);
265 + return new PickyCallable<T>() {
266 +
267 + @Override
268 + public T call() throws Exception {
269 + return callable.call();
270 + }
271 +
272 + @Override
273 + public int hint() {
274 + return hint.apply(callable);
275 + }
276 +
277 + };
278 + }
279 +
280 + /**
281 + * Abstraction to give a task a way to express it's preference to run on
282 + * certain thread.
283 + */
284 + public static interface PickyTask {
285 +
286 + /**
287 + * Returns hint for choosing which Thread to run this task on.
288 + *
289 + * @return hint value
290 + */
291 + int hint();
292 + }
293 +
294 + /**
295 + * A {@link FutureTask} implementing {@link PickyTask}.
296 + * <p>
297 + * Note: if the wrapped {@link Callable} or {@link Runnable} was an instance of
298 + * {@link PickyTask}, it will use {@link PickyTask#hint()} value, if not use {@link Object#hashCode()}.
299 + *
300 + * @param <T> result type.
301 + */
302 + public static class PickyFutureTask<T>
303 + extends FutureTask<T>
304 + implements PickyTask {
305 +
306 + private final Object runnableOrCallable;
307 +
308 + /**
309 + * Same as {@link FutureTask#FutureTask(Runnable, Object)}.
310 + */
311 + public PickyFutureTask(Runnable runnable, T value) {
312 + super(runnable, value);
313 + runnableOrCallable = checkNotNull(runnable);
314 + }
315 +
316 + /**
317 + * Same as {@link FutureTask#FutureTask(Callable)}.
318 + */
319 + public PickyFutureTask(Callable<T> callable) {
320 + super(callable);
321 + runnableOrCallable = checkNotNull(callable);
322 + }
323 +
324 + @Override
325 + public int hint() {
326 + if (runnableOrCallable instanceof PickyTask) {
327 + return ((PickyTask) runnableOrCallable).hint();
328 + } else {
329 + return runnableOrCallable.hashCode();
330 + }
331 + }
332 + }
333 +}
1 +package org.onlab.util;
2 +
3 +import java.util.concurrent.Callable;
4 +import java.util.concurrent.CountDownLatch;
5 +import java.util.concurrent.ExecutorService;
6 +import java.util.concurrent.TimeUnit;
7 +import java.util.concurrent.atomic.AtomicReference;
8 +
9 +import org.junit.After;
10 +import org.junit.Before;
11 +import org.junit.Test;
12 +import org.onlab.util.PredictableExecutor.PickyRunnable;
13 +import com.google.common.testing.EqualsTester;
14 +
15 +public class PredictableExecutorTest {
16 +
17 + private PredictableExecutor pexecutor;
18 + private ExecutorService executor;
19 +
20 + @Before
21 + public void setUp() {
22 + pexecutor = new PredictableExecutor(3, Tools.namedThreads("Thread-%d"));
23 + executor = pexecutor;
24 + }
25 +
26 + @After
27 + public void tearDown() {
28 + pexecutor.shutdownNow();
29 + }
30 +
31 + @Test
32 + public void test() throws InterruptedException {
33 + CountDownLatch latch = new CountDownLatch(7);
34 + AtomicReference<String> hintValue0 = new AtomicReference<>("");
35 + AtomicReference<String> hintValue1 = new AtomicReference<>("");
36 + AtomicReference<String> hintFunction0 = new AtomicReference<>("");
37 + AtomicReference<String> pickyRunnable0 = new AtomicReference<>("");
38 + AtomicReference<String> pickyRunnable1 = new AtomicReference<>("");
39 + AtomicReference<String> pickyCallable0 = new AtomicReference<>("");
40 + AtomicReference<String> hashCode0 = new AtomicReference<>("");
41 +
42 + pexecutor.execute(() -> {
43 + hintValue0.set(Thread.currentThread().getName());
44 + latch.countDown();
45 + }, 0);
46 +
47 + pexecutor.execute(() -> {
48 + hintValue1.set(Thread.currentThread().getName());
49 + latch.countDown();
50 + }, 1);
51 +
52 + pexecutor.execute(() -> {
53 + hintFunction0.set(Thread.currentThread().getName());
54 + latch.countDown();
55 + }, (runnable) -> 0);
56 +
57 + pexecutor.execute(new PickyRunnable() {
58 +
59 + @Override
60 + public void run() {
61 + pickyRunnable0.set(Thread.currentThread().getName());
62 + latch.countDown();
63 + }
64 +
65 + @Override
66 + public int hint() {
67 + return 0;
68 + }
69 + });
70 +
71 + executor.execute(new PickyRunnable() {
72 +
73 + @Override
74 + public void run() {
75 + pickyRunnable1.set(Thread.currentThread().getName());
76 + latch.countDown();
77 + }
78 +
79 + @Override
80 + public int hint() {
81 + return 1;
82 + }
83 + });
84 +
85 + Callable<Void> callable = new Callable<Void>() {
86 + @Override
87 + public Void call() {
88 + pickyCallable0.set(Thread.currentThread().getName());
89 + latch.countDown();
90 + return null;
91 + }
92 + };
93 +
94 + executor.submit(PredictableExecutor.picky(callable, 0));
95 +
96 +
97 + executor.execute(new Runnable() {
98 +
99 + @Override
100 + public void run() {
101 + hashCode0.set(Thread.currentThread().getName());
102 + latch.countDown();
103 +
104 + }
105 +
106 + @Override
107 + public int hashCode() {
108 + return 0;
109 + }
110 + });
111 +
112 + latch.await(1, TimeUnit.SECONDS);
113 +
114 + new EqualsTester()
115 + .addEqualityGroup(hintValue0.get(),
116 + hintFunction0.get(),
117 + pickyRunnable0.get(),
118 + pickyCallable0.get(),
119 + hashCode0.get())
120 + .addEqualityGroup(hintValue1.get(),
121 + pickyRunnable1.get())
122 + .testEquals();
123 + }
124 +}