Thomas Vachuska
Committed by Ray Milkey

Added ability to form a cluster via REST API.

Change-Id: Ib71f6b4caed1b1c4b9db78596ee35bf5cab05184
......@@ -18,6 +18,7 @@ package org.onosproject.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onlab.packet.IpAddress;
......@@ -38,7 +39,7 @@ public class NodeAddCommand extends AbstractShellCommand {
@Argument(index = 2, name = "tcpPort", description = "Node TCP listen port",
required = false, multiValued = false)
int tcpPort = 9876;
int tcpPort = DefaultControllerNode.DEFAULT_PORT;
@Override
protected void execute() {
......
......@@ -17,12 +17,24 @@ package org.onosproject.cluster;
import org.onlab.packet.IpAddress;
import java.util.Set;
/**
* Service for administering the cluster node membership.
*/
public interface ClusterAdminService {
/**
* Forms cluster configuration based on the specified set of node
* information.  This method resets and restarts the controller
* instance.
*
* @param nodes set of nodes that form the cluster
* @param ipPrefix IP address prefix, e.g. 10.0.1.*
*/
void formCluster(Set<ControllerNode> nodes, String ipPrefix);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
......
......@@ -15,12 +15,12 @@
*/
package org.onosproject.cluster;
import java.util.Set;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.store.Store;
import java.util.Set;
/**
* Manages inventory of controller cluster nodes; not intended for direct use.
*/
......@@ -65,6 +65,16 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
DateTime getLastUpdated(NodeId nodeId);
/**
* Forms cluster configuration based on the specified set of node
* information. Assumes subsequent restart for the new configuration to
* take hold.
*
* @param nodes set of nodes that form the cluster
* @param ipPrefix IP address prefix, e.g. 10.0.1.*
*/
void formCluster(Set<ControllerNode> nodes, String ipPrefix);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
......
......@@ -26,7 +26,7 @@ import static com.google.common.base.MoreObjects.toStringHelper;
*/
public class DefaultControllerNode implements ControllerNode {
private static final int DEFAULT_PORT = 9876;
public static final int DEFAULT_PORT = 9876;
private final NodeId id;
private final IpAddress ip;
......
......@@ -15,15 +15,13 @@
*/
package org.onosproject.codec.impl;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.Ethernet;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.codec.CodecService;
import org.onosproject.codec.JsonCodec;
import org.onosproject.core.Application;
......@@ -50,7 +48,9 @@ import org.onosproject.net.topology.TopologyCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Implementation of the JSON codec brokering service.
......@@ -67,6 +67,7 @@ public class CodecManager implements CodecService {
public void activate() {
codecs.clear();
registerCodec(Application.class, new ApplicationCodec());
registerCodec(ControllerNode.class, new ControllerNodeCodec());
registerCodec(Annotations.class, new AnnotationsCodec());
registerCodec(Device.class, new DeviceCodec());
registerCodec(Port.class, new PortCodec());
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.codec.impl;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.codec.CodecContext;
import org.onosproject.codec.JsonCodec;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
/**
* Device JSON codec.
*/
public final class ControllerNodeCodec extends JsonCodec<ControllerNode> {
@Override
public ObjectNode encode(ControllerNode node, CodecContext context) {
checkNotNull(node, "Controller node cannot be null");
ClusterService service = context.get(ClusterService.class);
return context.mapper().createObjectNode()
.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort())
.put("status", service.getState(node.id()).toString());
}
@Override
public ControllerNode decode(ObjectNode json, CodecContext context) {
checkNotNull(json, "JSON cannot be null");
String ip = json.path("ip").asText();
return new DefaultControllerNode(new NodeId(json.path("id").asText(ip)),
IpAddress.valueOf(ip),
json.path("tcpPort").asInt(DEFAULT_PORT));
}
}
......@@ -82,6 +82,11 @@
<groupId>org.apache.karaf.features</groupId>
<artifactId>org.apache.karaf.features.core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.karaf.system</groupId>
<artifactId>org.apache.karaf.system.core</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -15,18 +15,13 @@
*/
package org.onosproject.cluster.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterAdminService;
......@@ -41,6 +36,12 @@ import org.onosproject.event.AbstractListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the cluster service.
*/
......@@ -62,6 +63,9 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
@Activate
public void activate() {
store.setDelegate(delegate);
......@@ -105,6 +109,20 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
checkNotNull(nodes, "Nodes cannot be null");
checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
checkNotNull(ipPrefix, "IP prefix cannot be null");
store.formCluster(nodes, ipPrefix);
try {
log.warn("Shutting down container for cluster reconfiguration!");
systemService.reboot("now", SystemService.Swipe.NONE);
} catch (Exception e) {
log.error("Unable to reboot container", e);
}
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
......
......@@ -18,6 +18,8 @@ package org.onosproject.store.cluster.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
......@@ -43,8 +45,7 @@ public class ClusterDefinitionStore {
*/
public ClusterDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
ClusterDefinition definition = mapper.readValue(file, ClusterDefinition.class);
return definition;
return mapper.readValue(file, ClusterDefinition.class);
}
/**
......@@ -55,7 +56,8 @@ public class ClusterDefinitionStore {
public void write(ClusterDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
final ObjectMapper mapper = new ObjectMapper();
Files.createParentDirs(file);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
\ No newline at end of file
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.cluster.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -37,6 +38,8 @@ import org.onosproject.cluster.ControllerNode.State;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.consistent.impl.DatabaseDefinition;
import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
......@@ -55,11 +58,13 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
......@@ -74,17 +79,14 @@ public class DistributedClusterStore
private static final Logger log = getLogger(DistributedClusterStore.class);
public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
// TODO: make these configurable.
private static final int HEARTBEAT_FD_PORT = 2419;
private static final int HEARTBEAT_INTERVAL_MS = 100;
private static final int PHI_FAILURE_THRESHOLD = 10;
private static final String CONFIG_DIR = "../config";
private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
public static final int DEFAULT_PORT = 9876;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
......@@ -97,6 +99,8 @@ public class DistributedClusterStore
};
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
private static final String ONOS_NIC = "ONOS_NIC";
private ClusterDefinition clusterDefinition;
......@@ -116,7 +120,7 @@ public class DistributedClusterStore
@Activate
public void activate() {
File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
......@@ -129,13 +133,12 @@ public class DistributedClusterStore
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
.map(nodeInfo -> new DefaultControllerNode(new NodeId(nodeInfo.getId()),
IpAddress.valueOf(nodeInfo.getIp()),
nodeInfo.getTcpPort()))
.map(n -> new DefaultControllerNode(new NodeId(n.getId()),
IpAddress.valueOf(n.getIp()),
n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
throw new IllegalStateException(
"Failed to read cluster definition.", e);
throw new IllegalStateException("Failed to read cluster definition.", e);
}
seedNodes.forEach(node -> {
......@@ -179,26 +182,30 @@ public class DistributedClusterStore
}
/**
* Returns the site local address if one can be found, loopback otherwise.
* Returns the address that matches the IP prefix given in ONOS_NIC
* environment variable if one was specified, or the first site local
* address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
if (address.getAddress()[0] == (byte) 0xC0) {
return address.toString().substring(1);
IpAddress ip = IpAddress.valueOf(address);
if (ipPrefix == null && address.isSiteLocalAddress() ||
ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
return ip.toString();
}
}
}
return InetAddress.getLoopbackAddress().toString().substring(1);
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
return null;
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
@Deactivate
......@@ -255,9 +262,6 @@ public class DistributedClusterStore
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address must not be null");
checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
updateState(nodeId, State.INACTIVE);
......@@ -275,6 +279,24 @@ public class DistributedClusterStore
}
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
try {
Set<NodeInfo> infos = Sets.newHashSet();
nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
n.ip().toString(),
n.tcpPort())));
ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
DatabaseDefinition ddef = DatabaseDefinition.from(infos);
new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
} catch (IOException e) {
log.error("Unable to form cluster", e);
}
}
private void updateState(NodeId nodeId, State newState) {
nodeStates.put(nodeId, newState);
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
......@@ -387,4 +409,5 @@ public class DistributedClusterStore
public DateTime getLastUpdated(NodeId nodeId) {
return nodeStateLastUpdatedTimes.get(nodeId);
}
}
......
......@@ -24,12 +24,12 @@ import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
......@@ -39,7 +39,6 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.OptionalCacheLoader;
import org.onlab.packet.IpAddress;
import java.util.Map;
import java.util.Set;
......@@ -131,6 +130,11 @@ public class HazelcastClusterStore
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
throw new UnsupportedOperationException("formCluster not implemented");
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
......
......@@ -15,16 +15,20 @@
*/
package org.onosproject.store.consistent.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.onosproject.store.cluster.impl.NodeInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.onosproject.store.cluster.impl.NodeInfo;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* Partitioned database configuration.
*/
......@@ -34,11 +38,13 @@ public class DatabaseDefinition {
/**
* Creates a new DatabaseDefinition.
*
* @param partitions partition map
* @param nodes set of nodes
* @param nodes set of nodes
* @return database definition
*/
public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions, Set<NodeInfo> nodes) {
public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions,
Set<NodeInfo> nodes) {
checkNotNull(partitions);
checkNotNull(nodes);
DatabaseDefinition definition = new DatabaseDefinition();
......@@ -48,7 +54,18 @@ public class DatabaseDefinition {
}
/**
* Creates a new DatabaseDefinition using default partitions.
*
* @param nodes set of nodes
* @return database definition
*/
public static DatabaseDefinition from(Set<NodeInfo> nodes) {
return from(generateDefaultPartitions(nodes), nodes);
}
/**
* Returns the map of database partitions.
*
* @return db partition map
*/
public Map<String, Set<NodeInfo>> getPartitions() {
......@@ -57,9 +74,35 @@ public class DatabaseDefinition {
/**
* Returns the set of nodes.
*
* @return nodes
*/
public Set<NodeInfo> getNodes() {
return nodes;
}
/**
* Generates set of default partitions using permutations of the nodes.
*
* @param nodes information about cluster nodes
* @return default partition map
*/
private static Map<String, Set<NodeInfo>> generateDefaultPartitions(Set<NodeInfo> nodes) {
List<NodeInfo> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.getId().compareTo(o2.getId()));
Map<String, Set<NodeInfo>> partitions = Maps.newHashMap();
int length = nodes.size();
int count = 3;
for (int i = 0; i < length; i++) {
Set<NodeInfo> set = new HashSet<>(count);
for (int j = 0; j < count; j++) {
set.add(sorted.get((i + j) % length));
}
partitions.put("p" + (i + 1), set);
}
return partitions;
}
}
\ No newline at end of file
......
......@@ -20,13 +20,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Files;
/**
* Allows for reading and writing partitioned database definition as a JSON file.
*/
public class DatabaseDefinitionStore {
private final File definitionfile;
private final File file;
/**
* Creates a reader/writer of the database definition file.
......@@ -34,7 +35,7 @@ public class DatabaseDefinitionStore {
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(String filePath) {
definitionfile = new File(checkNotNull(filePath));
file = new File(checkNotNull(filePath));
}
/**
......@@ -43,7 +44,7 @@ public class DatabaseDefinitionStore {
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(File filePath) {
definitionfile = checkNotNull(filePath);
file = checkNotNull(filePath);
}
/**
......@@ -54,8 +55,7 @@ public class DatabaseDefinitionStore {
*/
public DatabaseDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
DatabaseDefinition definition = mapper.readValue(definitionfile, DatabaseDefinition.class);
return definition;
return mapper.readValue(file, DatabaseDefinition.class);
}
/**
......@@ -67,7 +67,8 @@ public class DatabaseDefinitionStore {
public void write(DatabaseDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
final ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(definitionfile, definition);
Files.createParentDirs(file);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
......
......@@ -16,11 +16,9 @@
package org.onosproject.store.consistent.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
......@@ -34,7 +32,6 @@ import net.kuujo.copycat.netty.NettyTcpProtocol;
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -75,17 +72,19 @@ import static org.slf4j.LoggerFactory.getLogger;
public class DatabaseManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
private ClusterCoordinator coordinator;
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
private static final String CONFIG_DIR = "../config";
private static final String PARTITION_DEFINITION_FILE = "tablets.json";
private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
public static final String PARTITION_DEFINITION_FILE = "../config/tablets.json";
public static final String BASE_PARTITION_NAME = "p0";
private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
private static final int RAFT_ELECTION_TIMEOUT = 3000;
private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
private ClusterCoordinator coordinator;
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -98,15 +97,14 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Activate
public void activate() {
// load database configuration
File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", file.getAbsolutePath());
File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
Map<String, Set<NodeInfo>> partitionMap;
try {
DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(file);
if (!file.exists()) {
DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(databaseDefFile);
if (!databaseDefFile.exists()) {
createDefaultDatabaseDefinition(databaseDefStore);
}
partitionMap = databaseDefStore.read().getPartitions();
......@@ -189,10 +187,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = DistributedClusterStore.getSiteLocalAddress();
NodeInfo node = NodeInfo.from(ip, ip, DistributedClusterStore.DEFAULT_PORT);
NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
try {
store.write(DatabaseDefinition.from(ImmutableMap.of("p1", ImmutableSet.of(node)),
ImmutableSet.of(node)));
store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
......
......@@ -15,10 +15,7 @@
*/
package org.onosproject.store.trivial.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -36,7 +33,9 @@ import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure devices using trivial in-memory
......@@ -94,6 +93,11 @@ public class SimpleClusterStore
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return null;
}
......
......@@ -258,7 +258,12 @@
<version>${karaf.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.karaf.system</groupId>
<artifactId>org.apache.karaf.system.core</artifactId>
<version>${karaf.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
......
#!/bin/bash
# -----------------------------------------------------------------------------
# Forms ONOS cluster using REST API.
# -----------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
ip=${1:-$OCI}
if [ $ip = "cell" ]; then
ip=$OC1
nodes=$(env | grep "OC[0-9]*=" | grep -v "OC1=" | cut -d= -f2)
else
shift
nodes=$*
fi
ipPrefix=${ip%.*}
aux=/tmp/${ipPrefix}.cluster.json
trap "rm -f $aux" EXIT
echo "{ \"nodes\": [ { \"ip\": \"$ip\" }" > $aux
for node in $nodes; do
echo ", { \"ip\": \"$node\" }" >> $aux
done
echo "], \"ipPrefix\": \"$ipPrefix.*\" }" >> $aux
for node in $ip $nodes; do
echo "Forming cluster on $node..."
curl -X POST http://$node:8181/onos/v1/cluster/configuration -d @$aux
done
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.rest;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.codec.JsonCodec;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
/**
* REST resource for interacting with the ONOS cluster subsystem.
*/
@Path("cluster")
public class ClusterWebResource extends AbstractWebResource {
public static final String NODE_NOT_FOUND = "Node is not found";
@GET
public Response getClusterNodes() {
Iterable<ControllerNode> nodes = get(ClusterService.class).getNodes();
return ok(encodeArray(ControllerNode.class, "nodes", nodes)).build();
}
@GET
@Path("{id}")
public Response getClusterNode(@PathParam("id") String id) {
ControllerNode node = nullIsNotFound(get(ClusterService.class).getNode(new NodeId(id)),
NODE_NOT_FOUND);
return ok(codec(ControllerNode.class).encode(node, this)).build();
}
@POST
@Path("configuration")
public Response formCluster(InputStream config) throws IOException {
JsonCodec<ControllerNode> codec = codec(ControllerNode.class);
ObjectNode root = (ObjectNode) mapper().readTree(config);
String ipPrefix = root.path("ipPrefix").asText();
List<ControllerNode> nodes = codec.decode((ArrayNode) root.path("nodes"), this);
get(ClusterAdminService.class).formCluster(new HashSet<>(nodes), ipPrefix);
return Response.ok().build();
}
}
......@@ -62,6 +62,7 @@
org.onosproject.rest.JsonBodyWriter,
org.onosproject.rest.ApplicationsWebResource,
org.onosproject.rest.ClusterWebResource,
org.onosproject.rest.DevicesWebResource,
org.onosproject.rest.LinksWebResource,
org.onosproject.rest.HostsWebResource,
......