Jian Li
Committed by Gerrit Code Review

[ONOS-3536] Implement back-end metrics saving logic using RRD

Change-Id: I1b3c495380884571dc88d2f9fb3152fdf41ef655
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cpman;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Database for storing a metric.
*/
public interface MetricsDatabase {
/**
* Returns the metric name of this database.
*
* @return metric name
*/
String metricName();
/**
* Update metric value by specifying metric type.
*
* @param metricType metric type (e.g., load, usage, etc.)
* @param value metric value
*/
void updateMetric(String metricType, double value);
/**
* Update metric value by specifying metric type in a certain time.
*
* @param metricType metric type (e.g., load, usage, etc.)
* @param value metric value
* @param time update time in seconds
*/
void updateMetric(String metricType, double value, long time);
/**
* Update metric values of a collection of metric types.
*
* @param metrics a collection of metrics which consists of a pair of
* metric type and metric value
* @param time update time in seconds
*/
void updateMetrics(Map<String, Double> metrics, long time);
/**
* Update metric values of a collection of metric types.
*
* @param metrics a collection of metrics which consists of a pair of
* metric type and metric value
*/
void updateMetrics(Map<String, Double> metrics);
/**
* Returns most recent metric value of a given metric type.
*
* @param metricType metric type
* @return metric value
*/
double recentMetric(String metricType);
/**
* Return most recent metric values of a given metric type for a given period.
*
* @param metricType metric type
* @param duration duration
* @param unit time unit
* @return a collection of metric value
*/
double[] recentMetrics(String metricType, int duration, TimeUnit unit);
/**
* Returns minimum metric value of a given metric type.
*
* @param metricType metric type
* @return metric value
*/
double minMetric(String metricType);
/**
* Returns maximum metric value of a given metric type.
*
* @param metricType metric type
* @return metric value
*/
double maxMetric(String metricType);
/**
* Returns a collection of metric values of a given metric type for a day.
*
* @param metricType metric type
* @return a collection of metric value
*/
double[] metrics(String metricType);
/**
* Returns a collection of metric values of a given metric type for
* a given period.
*
* @param metricType metric type
* @param startTime start time
* @param endTime end time
* @return a collection of metric value
*/
double[] metrics(String metricType, long startTime, long endTime);
/**
* A builder of MetricsDatabase.
*/
interface Builder {
/**
* Sets the metric name.
*
* @param metricName metric name
* @return builder object
*/
Builder withMetricName(String metricName);
/**
* Add a new metric to be monitored.
*
* @param metricType control metric type
*/
Builder addMetricType(String metricType);
/**
* Builds a MetricDatabase instance.
*
* @return MetricDatabase instance
*/
MetricsDatabase build();
}
}
......@@ -105,6 +105,11 @@
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.rrd4j</groupId>
<artifactId>rrd4j</artifactId>
<version>2.2</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
</dependency>
......
......@@ -43,7 +43,6 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
@Activate
public void activate() {
}
@Deactivate
......@@ -53,6 +52,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
@Override
public void updateMetric(ControlMetric cpm, Integer updateInterval,
Optional<DeviceId> deviceId) {
}
@Override
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cpman.impl;
import org.apache.commons.lang3.ArrayUtils;
import org.onosproject.cpman.MetricsDatabase;
import org.rrd4j.ConsolFun;
import org.rrd4j.DsType;
import org.rrd4j.core.ArcDef;
import org.rrd4j.core.DsDef;
import org.rrd4j.core.FetchRequest;
import org.rrd4j.core.RrdBackendFactory;
import org.rrd4j.core.RrdDb;
import org.rrd4j.core.RrdDef;
import org.rrd4j.core.Sample;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* An implementation of control plane metrics back-end database.
*/
public final class DefaultMetricsDatabase implements MetricsDatabase {
private String metricName;
private RrdDb rrdDb;
private Sample sample;
private static final long SECONDS_OF_DAY = 60L * 60L * 24L;
private static final long SECONDS_OF_MINUTE = 60L;
private static final ConsolFun CONSOL_FUNCTION = ConsolFun.LAST;
private static final String NON_EXIST_METRIC = "Non-existing metric type.";
private static final String INSUFFICIENT_DURATION = "Given duration less than one minute.";
private static final String EXCEEDED_DURATION = "Given duration exceeds a day time.";
private DefaultMetricsDatabase(String metricName, RrdDb rrdDb) {
this.metricName = metricName;
this.rrdDb = rrdDb;
}
@Override
public String metricName() {
return this.metricName;
}
@Override
public void updateMetric(String metricType, double value) {
updateMetric(metricType, value, System.currentTimeMillis() / 1000L);
}
@Override
public void updateMetric(String metricType, double value, long time) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
sample = rrdDb.createSample(time);
sample.setValue(metricType, value);
sample.update();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void updateMetrics(Map<String, Double> metrics) {
updateMetrics(metrics, System.currentTimeMillis() / 1000L);
}
@Override
public void updateMetrics(Map<String, Double> metrics, long time) {
try {
sample = rrdDb.createSample(time);
metrics.forEach((k, v) -> {
try {
checkArgument(rrdDb.containsDs(k), NON_EXIST_METRIC);
sample.setValue(k, v);
} catch (IOException e) {
e.printStackTrace();
}
});
sample.update();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public double recentMetric(String metricType) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
return rrdDb.getDatasource(metricType).getLastValue();
} catch (IOException e) {
e.printStackTrace();
}
return 0D;
}
@Override
public double[] recentMetrics(String metricType, int duration, TimeUnit unit) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
long endTime = rrdDb.getLastUpdateTime();
long startTime = endTime - TimeUnit.SECONDS.convert(duration, unit);
if (checkTimeRange(startTime, endTime)) {
FetchRequest fr = rrdDb.createFetchRequest(CONSOL_FUNCTION, startTime, endTime);
return arrangeDataPoints(fr.fetchData().getValues(metricType));
}
} catch (IOException e) {
e.printStackTrace();
}
return new double[0];
}
@Override
public double minMetric(String metricType) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
long endTime = rrdDb.getLastUpdateTime() - 1;
long startTime = endTime - SECONDS_OF_DAY + 1;
return minMetric(metricType, startTime, endTime);
} catch (IOException e) {
e.printStackTrace();
}
return 0D;
}
@Override
public double maxMetric(String metricType) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
long endTime = rrdDb.getLastUpdateTime();
long startTime = endTime - SECONDS_OF_DAY;
return maxMetric(metricType, startTime, endTime);
} catch (IOException e) {
e.printStackTrace();
}
return 0D;
}
@Override
public double[] metrics(String metricType) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
long endTime = rrdDb.getLastUpdateTime();
long startTime = endTime - SECONDS_OF_DAY;
return metrics(metricType, startTime, endTime);
} catch (IOException e) {
e.printStackTrace();
}
return new double[0];
}
@Override
public double[] metrics(String metricType, long startTime, long endTime) {
try {
checkArgument(rrdDb.containsDs(metricType), NON_EXIST_METRIC);
if (checkTimeRange(startTime, endTime)) {
FetchRequest fr = rrdDb.createFetchRequest(CONSOL_FUNCTION, startTime, endTime);
return arrangeDataPoints(fr.fetchData().getValues(metricType));
}
} catch (IOException e) {
e.printStackTrace();
}
return new double[0];
}
// try to check whether projected time range is within a day
private boolean checkTimeRange(long startTime, long endTime) {
// check whether the given startTime and endTime larger than 1 minute
checkArgument(endTime - startTime >= SECONDS_OF_MINUTE, INSUFFICIENT_DURATION);
// check whether the given start time and endTime smaller than 1 day
checkArgument(endTime - startTime <= SECONDS_OF_DAY, EXCEEDED_DURATION);
return true;
}
// try to remove first and last data points
private double[] arrangeDataPoints(double[] data) {
return Arrays.copyOfRange(data, 1, data.length - 1);
}
// obtains maximum metric value among projected range
private double maxMetric(String metricType, long startTime, long endTime) {
double[] all = metrics(metricType, startTime, endTime);
List list = Arrays.asList(ArrayUtils.toObject(all));
return (double) Collections.max(list);
}
// obtains minimum metric value among projected range
private double minMetric(String metricType, long startTime, long endTime) {
double[] all = metrics(metricType, startTime, endTime);
List list = Arrays.asList(ArrayUtils.toObject(all));
return (double) Collections.min(list);
}
public static final class Builder implements MetricsDatabase.Builder {
private static final int RESOLUTION = 60; // seconds
private static final String STORING_METHOD = "MEMORY";
private static final DsType SOURCE_TYPE = DsType.GAUGE;
private static final String DB_PATH = "CPMAN";
private static final ConsolFun CONSOL_FUNCTION = ConsolFun.LAST;
private static final double MIN_VALUE = 0;
private static final double MAX_VALUE = Double.NaN;
private static final double XFF_VALUE = 0.2;
private static final int STEP_VALUE = 1;
private static final int ROW_VALUE = 60 * 24;
private static final String METRIC_NAME_MSG = "Must specify a metric name.";
private static final String METRIC_TYPE_MSG = "Must supply at least a metric type.";
private RrdDb rrdDb;
private RrdDef rrdDef;
private List<DsDef> dsDefs;
private String metricName;
public Builder() {
// define the resolution of monitored metrics
rrdDef = new RrdDef(DB_PATH, RESOLUTION);
// initialize data source definition list
dsDefs = new ArrayList<>();
}
@Override
public Builder withMetricName(String metricName) {
this.metricName = metricName;
return this;
}
@Override
public Builder addMetricType(String metricType) {
dsDefs.add(defineSchema(metricType));
return this;
}
@Override
public MetricsDatabase build() {
checkNotNull(metricName, METRIC_NAME_MSG);
checkArgument(dsDefs.size() != 0, METRIC_TYPE_MSG);
try {
DsDef[] dsDefArray = new DsDef[dsDefs.size()];
IntStream.range(0, dsDefs.size()).forEach(i -> dsDefArray[i] = dsDefs.get(i));
rrdDef.addDatasource(dsDefArray);
rrdDef.setStep(RESOLUTION);
// raw archive, no aggregation is required
ArcDef rawArchive = new ArcDef(CONSOL_FUNCTION, XFF_VALUE,
STEP_VALUE, ROW_VALUE);
rrdDef.addArchive(rawArchive);
// always store the metric data in memory...
rrdDb = new RrdDb(rrdDef, RrdBackendFactory.getFactory(STORING_METHOD));
} catch (IOException e) {
e.printStackTrace();
}
return new DefaultMetricsDatabase(metricName, rrdDb);
}
private DsDef defineSchema(String metricType) {
return new DsDef(metricType, SOURCE_TYPE, RESOLUTION,
MIN_VALUE, MAX_VALUE);
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cpman.impl;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.cpman.MetricsDatabase;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
/**
* Unit test for control plane metrics back-end database.
*/
public class MetricsDatabaseTest {
private MetricsDatabase mdb;
private static final String CPU_METRIC = "cpu";
private static final String CPU_LOAD = "load";
private static final String MEMORY_METRIC = "memory";
private static final String MEMORY_FREE_PERC = "freePerc";
private static final String MEMORY_USED_PERC = "usedPerc";
/**
* Initializes the MetricsDatabase instance.
*/
@Before
public void setUp() {
mdb = new DefaultMetricsDatabase.Builder()
.withMetricName(CPU_METRIC)
.addMetricType(CPU_LOAD)
.build();
}
/**
* Tests the metric update function.
*/
@Test
public void testMetricUpdate() {
long currentTime = System.currentTimeMillis() / 1000L;
mdb.updateMetric(CPU_LOAD, 30, currentTime);
assertThat(30D, is(mdb.recentMetric(CPU_LOAD)));
mdb.updateMetric(CPU_LOAD, 40, currentTime + 60);
assertThat(40D, is(mdb.recentMetric(CPU_LOAD)));
mdb.updateMetric(CPU_LOAD, 50, currentTime + 120);
assertThat(50D, is(mdb.recentMetric(CPU_LOAD)));
}
/**
* Tests the metric range fetch function.
*/
@Test
public void testMetricRangeFetch() {
// full range fetch
assertThat(mdb.metrics(CPU_LOAD).length, is(60 * 24));
// query one minute time range
assertThat(mdb.recentMetrics(CPU_LOAD, 1, TimeUnit.MINUTES).length, is(1));
// query one hour time range
assertThat(mdb.recentMetrics(CPU_LOAD, 1, TimeUnit.HOURS).length, is(60));
// query one day time range
assertThat(mdb.recentMetrics(CPU_LOAD, 1, TimeUnit.DAYS).length, is(60 * 24));
// query a specific time range
long endTime = System.currentTimeMillis() / 1000L;
long startTime = endTime - 60 * 5;
assertThat(mdb.metrics(CPU_LOAD, startTime, endTime).length, is(5));
}
/**
* Test the projected time range.
*/
@Test(expected = IllegalArgumentException.class)
public void testExceededTimeRange() {
// query 25 hours time range
assertThat(mdb.recentMetrics(CPU_LOAD, 25, TimeUnit.HOURS).length, is(60 * 24));
}
/**
* Test the projected time range.
*/
@Test(expected = IllegalArgumentException.class)
public void testInsufficientTimeRange() {
// query 50 seconds time range
assertThat(mdb.recentMetrics(CPU_LOAD, 50, TimeUnit.SECONDS).length, is(1));
}
/**
* Test multiple metrics update and query.
*/
@Test
public void testMultipleMetrics() {
MetricsDatabase multiMdb = new DefaultMetricsDatabase.Builder()
.withMetricName(MEMORY_METRIC)
.addMetricType(MEMORY_FREE_PERC)
.addMetricType(MEMORY_USED_PERC)
.build();
Map<String, Double> metrics = new HashMap<>();
metrics.putIfAbsent(MEMORY_FREE_PERC, 30D);
metrics.putIfAbsent(MEMORY_USED_PERC, 70D);
multiMdb.updateMetrics(metrics);
assertThat(30D, is(multiMdb.recentMetric(MEMORY_FREE_PERC)));
assertThat(70D, is(multiMdb.recentMetric(MEMORY_USED_PERC)));
}
}