Yuta HIGUCHI
Committed by Yuta Higuchi

ClusterMessagingProtocol: stop processing in netty handler thread

- Fix for io.netty.util.concurrent.BlockingOperationException

Change-Id: Ie0f4dee2c3a49aa4b03674f6f7678f32fcf07a44
......@@ -4,12 +4,12 @@ import static com.google.common.base.Verify.verifyNotNull;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static java.util.concurrent.Executors.newCachedThreadPool;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -49,9 +49,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
private ControllerNode remoteNode;
private final AtomicBoolean connectionOK = new AtomicBoolean(true);
// TODO: make this non-static and stop on close
private static final ExecutorService THREAD_POOL
= Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
private ExecutorService pool;
public ClusterMessagingProtocolClient(
ClusterService clusterService,
......@@ -87,11 +85,19 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
@Override
public synchronized CompletableFuture<Void> connect() {
if (pool == null || pool.isShutdown()) {
// TODO include remote name?
pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-client-%d"));
}
return CompletableFuture.completedFuture(null);
}
@Override
public synchronized CompletableFuture<Void> close() {
if (pool != null) {
pool.shutdownNow();
pool = null;
}
return CompletableFuture.completedFuture(null);
}
......@@ -112,7 +118,11 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
private <I, O> CompletableFuture<O> requestReply(I request) {
CompletableFuture<O> future = new CompletableFuture<>();
THREAD_POOL.submit(new RPCTask<I, O>(request, future));
if (pool == null) {
log.info("Attempted to use closed client, connecting now. {}", request);
connect();
}
pool.submit(new RPCTask<I, O>(request, future));
return future;
}
......
package org.onlab.onos.store.service.impl;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.*;
import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
......@@ -27,12 +28,15 @@ import org.slf4j.Logger;
public class ClusterMessagingProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
private final ClusterCommunicationService clusterCommunicator;
private volatile RequestHandler handler;
private ClusterCommunicationService clusterCommunicator;
private ExecutorService pool;
public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
this.clusterCommunicator = clusterCommunicator;
}
@Override
......@@ -42,67 +46,128 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
@Override
public CompletableFuture<Void> listen() {
clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
new CopycatMessageHandler<PingRequest>());
clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
new CopycatMessageHandler<SyncRequest>());
clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
new CopycatMessageHandler<PollRequest>());
clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
new CopycatMessageHandler<SubmitRequest>());
if (pool == null || pool.isShutdown()) {
pool = newCachedThreadPool(namedThreads("copycat-netty-messaging-server-%d"));
}
clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
clusterCommunicator.removeSubscriber(COPYCAT_PING);
clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
clusterCommunicator.removeSubscriber(COPYCAT_POLL);
clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
if (pool != null) {
pool.shutdownNow();
pool = null;
}
return CompletableFuture.completedFuture(null);
}
private class CopycatMessageHandler<T> implements ClusterMessageHandler {
private final class PingHandler extends CopycatMessageHandler<PingRequest> {
@Override
public void raftHandle(PingRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().ping(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
@Override
public void raftHandle(SyncRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().sync(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private final class PollHandler extends CopycatMessageHandler<PollRequest> {
@Override
public void raftHandle(PollRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().poll(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
@Override
public void raftHandle(SubmitRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().submit(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
public abstract void raftHandle(T request, ClusterMessage message);
@Override
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
if (handler == null) {
T request = DB_SERIALIZER.decode(message.payload());
raftHandle(request, message);
}
RequestHandler currentHandler() {
RequestHandler currentHandler = handler;
if (currentHandler == null) {
// there is a slight window of time during state transition,
// where handler becomes null
long sleepMs = 1;
for (int i = 0; i < 10; ++i) {
if (handler != null) {
currentHandler = handler;
if (currentHandler != null) {
break;
}
try {
Thread.sleep(1);
sleepMs <<= 1;
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
log.trace("Exception", e);
log.error("Interrupted", e);
return handler;
}
}
if (handler == null) {
if (currentHandler == null) {
log.error("There was no handler registered!");
return;
}
}
if (request.getClass().equals(PingRequest.class)) {
handler.ping((PingRequest) request)
.whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
handler.poll((PollRequest) request)
.whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
handler.sync((SyncRequest) request)
.whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
handler.submit((SubmitRequest) request)
.whenComplete(new PostExecutionTask<SubmitResponse>(message));
} else {
throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
return handler;
}
}
return currentHandler;
}
private class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
private final ClusterMessage message;
......@@ -111,15 +176,15 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
}
@Override
public void accept(R response, Throwable t) {
if (t != null) {
log.error("Processing for " + message.subject() + " failed.", t);
public void accept(R response, Throwable error) {
if (error != null) {
log.error("Processing {} failed.", message.subject(), error);
} else {
try {
log.trace("responding to {}", message.subject());
message.respond(ClusterMessagingProtocol.DB_SERIALIZER.encode(response));
message.respond(DB_SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to " + response.getClass().getName(), e);
log.error("Failed responding with {}", response.getClass().getName(), e);
}
}
}
......