Aaron Kruglikov
Committed by Gerrit Code Review

Setting up monitoring of cluster communcation service

Change-Id: I771b23db6920b26b592abc5d5156e9d77cde4f00
/*
* Copyright 2015 Open Networking Laboratory
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
package org.onosproject.utils;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Miscellaneous common facilities used for construction of various core and
* app subsystems.
*/
package org.onosproject.utils;
\ No newline at end of file
......@@ -31,6 +31,7 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.utils.MeteringAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,6 +48,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@Component(immediate = true)
@Service
......@@ -55,6 +57,18 @@ public class ClusterCommunicationManager
private final Logger log = LoggerFactory.getLogger(getClass());
private final MeteringAgent subjectMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, SUBJECT_PREFIX, true);
private final MeteringAgent endpointMeteringAgent = new MeteringAgent(PRIMITIVE_NAME, ENDPOINT_PREFIX, true);
private static final String PRIMITIVE_NAME = "clusterCommunication";
private static final String SUBJECT_PREFIX = "subject";
private static final String ENDPOINT_PREFIX = "endpoint";
private static final String SERIALIZING = "serialization";
private static final String DESERIALIZING = "deserialization";
private static final String NODE_PREFIX = "node:";
private static final String ROUND_TRIP_SUFFIX = ".rtt";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
......@@ -110,7 +124,8 @@ public class ClusterCommunicationManager
byte[] payload = new ClusterMessage(
localNodeId,
subject,
encoder.apply(message)).getBytes();
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message)
).getBytes();
return doUnicast(subject, payload, toNodeId);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
......@@ -125,7 +140,8 @@ public class ClusterCommunicationManager
byte[] payload = new ClusterMessage(
localNodeId,
subject,
encoder.apply(message)).getBytes();
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(message))
.getBytes();
nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
......@@ -139,8 +155,10 @@ public class ClusterCommunicationManager
ClusterMessage envelope = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
encoder.apply(message));
return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
timeFunction(encoder, subjectMeteringAgent, SERIALIZING).
apply(message));
return sendAndReceive(subject, envelope.getBytes(), toNodeId).
thenApply(bytes -> timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).apply(bytes));
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
......@@ -157,7 +175,15 @@ public class ClusterCommunicationManager
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
final MeteringAgent.Context epContext = endpointMeteringAgent.
startTimer(NODE_PREFIX + toNodeId.toString() + ROUND_TRIP_SUFFIX);
final MeteringAgent.Context subjectContext = subjectMeteringAgent.
startTimer(subject.toString() + ROUND_TRIP_SUFFIX);
return messagingService.sendAndReceive(nodeEp, subject.value(), payload).
whenComplete((bytes, throwable) -> {
subjectContext.stop(throwable);
epContext.stop(throwable);
});
}
@Override
......@@ -213,6 +239,40 @@ public class ClusterCommunicationManager
executor);
}
/**
* Performs the timed function, returning the value it would while timing the operation.
*
* @param timedFunction the function to be timed
* @param meter the metering agent to be used to time the function
* @param opName the opname to be used when starting the meter
* @param <A> The param type of the function
* @param <B> The return type of the function
* @return the value returned by the timed function
*/
private <A, B> Function<A, B> timeFunction(Function<A, B> timedFunction,
MeteringAgent meter, String opName) {
checkNotNull(timedFunction);
checkNotNull(meter);
checkNotNull(opName);
return new Function<A, B>() {
@Override
public B apply(A a) {
final MeteringAgent.Context context = meter.startTimer(opName);
B result = null;
try {
result = timedFunction.apply(a);
} catch (Exception e) {
context.stop(e);
throw new RuntimeException(e);
} finally {
context.stop(null);
return result;
}
}
};
}
private class InternalClusterMessageHandler implements BiFunction<Endpoint, byte[], byte[]> {
private ClusterMessageHandler handler;
......@@ -243,7 +303,9 @@ public class ClusterCommunicationManager
@Override
public CompletableFuture<byte[]> apply(Endpoint sender, byte[] bytes) {
return handler.apply(decoder.apply(ClusterMessage.fromBytes(bytes).payload())).thenApply(encoder);
return handler.apply(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
apply(ClusterMessage.fromBytes(bytes).payload())).
thenApply(m -> timeFunction(encoder, subjectMeteringAgent, SERIALIZING).apply(m));
}
}
......@@ -258,7 +320,8 @@ public class ClusterCommunicationManager
@Override
public void accept(Endpoint sender, byte[] bytes) {
consumer.accept(decoder.apply(ClusterMessage.fromBytes(bytes).payload()));
consumer.accept(timeFunction(decoder, subjectMeteringAgent, DESERIALIZING).
apply(ClusterMessage.fromBytes(bytes).payload()));
}
}
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.utils.MeteringAgent;
import java.util.concurrent.CompletableFuture;
......
......@@ -32,6 +32,7 @@ import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.onosproject.utils.MeteringAgent;
public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
......
......@@ -47,6 +47,7 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.onosproject.utils.MeteringAgent;
import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
......
......@@ -33,6 +33,7 @@ import org.onosproject.store.service.SetEventListener;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onosproject.utils.MeteringAgent;
/**
* Implementation of {@link AsyncDistributedSet}.
......
......@@ -22,6 +22,7 @@ import org.onlab.util.SharedExecutors;
import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.utils.MeteringAgent;
import java.util.List;
import java.util.Set;
......
......@@ -32,6 +32,7 @@ import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.onosproject.utils.MeteringAgent;
/**
* {@link AsyncConsistentMap} that meters all its operations.
......
# Local VirtualBox-based ONOS instances 1,2 & ONOS mininet box
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.101"
export OC2="192.168.56.102"
export OC3="192.168.56.103"
export OCN="192.168.56.100"
export ONOS_USE_SSH=true
export ONOS_APPS="drivers,openflow,fwd,proxyarp,mobility"
export ONOS_USER=sdn
export ONOS_WEB_PASS=sdnrocks
export ONOS_WEB_USER=onos
\ No newline at end of file