Madan Jampani
Committed by Gerrit Code Review

[Falcon] Adds a status field to InternalMessage and support for replying with ap…

…propriate status when handler errors occur

Change-Id: I995bdd6c67b88b6d7729887d32083315213fb79f
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging;
import java.io.IOException;
/**
* Top level exception for MessagingService failures.
*/
@SuppressWarnings("serial")
public class MessagingException extends IOException {
public MessagingException() {
}
public MessagingException(String message) {
super(message);
}
public MessagingException(String message, Throwable t) {
super(message, t);
}
public MessagingException(Throwable t) {
super(t);
}
/**
* Exception indicating no remote registered remote handler.
*/
public static class NoRemoteHandler extends MessagingException {
}
/**
* Exception indicating handler failure.
*/
public static class RemoteHandlerFailure extends MessagingException {
}
}
\ No newline at end of file
......@@ -26,6 +26,7 @@ public enum DecoderState {
READ_SENDER_PORT,
READ_MESSAGE_TYPE_LENGTH,
READ_MESSAGE_TYPE,
READ_MESSAGE_STATUS,
READ_CONTENT_LENGTH,
READ_CONTENT
}
......
......@@ -25,16 +25,46 @@ import org.onosproject.store.cluster.messaging.Endpoint;
*/
public final class InternalMessage {
/**
* Message status.
*/
public enum Status {
/**
* All ok.
*/
OK,
/**
* Response status signifying no registered handler.
*/
ERROR_NO_HANDLER,
/**
* Response status signifying an exception handling the message.
*/
ERROR_HANDLER_EXCEPTION
// NOTE: For backwards compatibility it important that new enum constants
// be appended.
// FIXME: We should remove this restriction in the future.
}
private final long id;
private final Endpoint sender;
private final String type;
private final byte[] payload;
private final Status status;
public InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
this(id, sender, type, payload, Status.OK);
}
public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) {
this.id = id;
this.sender = sender;
this.type = type;
this.payload = payload;
this.status = status;
}
public long id() {
......@@ -53,12 +83,17 @@ public final class InternalMessage {
return payload;
}
public Status status() {
return status;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("type", type)
.add("sender", sender)
.add("status", status)
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
......
......@@ -16,12 +16,15 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,6 +47,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private int senderPort;
private int messageTypeLength;
private String messageType;
private Status status;
private int contentLength;
public MessageDecoder(int correctPreamble) {
......@@ -86,18 +90,27 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
byte[] messageTypeBytes = new byte[messageTypeLength];
buffer.readBytes(messageTypeBytes);
messageType = new String(messageTypeBytes, Charsets.UTF_8);
checkpoint(DecoderState.READ_MESSAGE_STATUS);
case READ_MESSAGE_STATUS:
status = Status.values()[buffer.readInt()];
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
byte[] payload;
if (contentLength > 0) {
//TODO Perform a sanity check on the size before allocating
byte[] payload = new byte[contentLength];
payload = new byte[contentLength];
buffer.readBytes(payload);
} else {
payload = new byte[0];
}
InternalMessage message = new InternalMessage(messageId,
new Endpoint(senderIp, senderPort),
messageType,
payload);
payload,
status);
out.add(message);
checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
break;
......
......@@ -75,6 +75,9 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write message type bytes
out.writeBytes(messageTypeBytes);
// write message status value
out.writeInt(message.status().ordinal());
byte[] payload = message.payload();
// write payload length
......
......@@ -16,12 +16,12 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
......@@ -41,6 +41,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
......@@ -53,7 +54,9 @@ import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.cluster.messaging.impl.InternalMessage.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -61,10 +64,12 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
......@@ -267,18 +272,14 @@ public class NettyMessagingManager implements MessagingService {
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
byte[] responsePayload = handler.apply(message.sender(), message.payload());
if (responsePayload != null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
sendAsync(message.sender(), response).whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to respond", error);
}
});
byte[] responsePayload = null;
Status status = Status.OK;
try {
responsePayload = handler.apply(message.sender(), message.payload());
} catch (Exception e) {
status = Status.ERROR_HANDLER_EXCEPTION;
}
sendReply(message, status, Optional.ofNullable(responsePayload));
}));
}
......@@ -286,17 +287,8 @@ public class NettyMessagingManager implements MessagingService {
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
if (error == null) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
result);
sendAsync(message.sender(), response).whenComplete((r, e) -> {
if (e != null) {
log.debug("Failed to respond", e);
}
});
}
Status status = error == null ? Status.OK : Status.ERROR_HANDLER_EXCEPTION;
sendReply(message, status, Optional.ofNullable(result));
});
});
}
......@@ -500,9 +492,15 @@ public class NettyMessagingManager implements MessagingService {
Callback callback =
callbacks.getIfPresent(message.id());
if (callback != null) {
if (message.status() == Status.OK) {
callback.complete(message.payload());
} else if (message.status() == Status.ERROR_NO_HANDLER) {
callback.completeExceptionally(new MessagingException.NoRemoteHandler());
} else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
}
} else {
log.warn("Received a reply for message id:[{}]. "
log.debug("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
......@@ -515,8 +513,22 @@ public class NettyMessagingManager implements MessagingService {
if (handler != null) {
handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
log.debug("No handler for message type {}", message.type(), message.sender());
sendReply(message, Status.ERROR_NO_HANDLER, Optional.empty());
}
}
private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
InternalMessage response = new InternalMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload.orElse(new byte[0]),
status);
sendAsync(message.sender(), response).whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to respond", error);
}
});
}
private final class Callback {
......