Madan Jampani
Committed by Gerrit Code Review

Cluster scaling enchancements

	- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
	- Server open logic updated to handle joining an existing cluster.

Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
......@@ -43,6 +43,9 @@ public class PartitionsListCommand extends AbstractShellCommand {
* @param partitionInfo partition descriptions
*/
private void displayPartitions(List<PartitionInfo> partitionInfo) {
if (partitionInfo.isEmpty()) {
return;
}
print("----------------------------------------------------------");
print(FMT, "Name", "Term", "Members", "");
print("----------------------------------------------------------");
......
......@@ -19,10 +19,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collection;
import java.util.Enumeration;
......@@ -47,8 +47,6 @@ import org.onosproject.net.provider.AbstractProviderService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
/**
* Implementation of ClusterMetadataService.
*/
......@@ -126,11 +124,15 @@ public class ClusterMetadataManager
* @return primary cluster metadata provider
*/
private ClusterMetadataProvider getPrimaryProvider() {
String metadataUri = System.getProperty("onos.cluster.metadata.uri");
try {
URI uri = new URI(System.getProperty("onos.cluster.metadata.uri", "config:///cluster.json"));
return getProvider(uri.getScheme());
} catch (URISyntaxException e) {
Throwables.propagate(e);
String protocol = metadataUri == null ? null : new URL(metadataUri).getProtocol();
if (protocol != null && (!protocol.equals("file") && !protocol.equals("http"))) {
return getProvider(protocol);
}
// file provider supports both "file" and "http" uris
return getProvider("file");
} catch (MalformedURLException e) {
return null;
}
}
......
......@@ -19,16 +19,14 @@ import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.net.URL;
import java.net.URLConnection;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
......@@ -87,11 +85,12 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataProviderRegistry providerRegistry;
private static final ProviderId PROVIDER_ID = new ProviderId("config", "none");
private static final ProviderId PROVIDER_ID = new ProviderId("file", "none");
private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
private final ExecutorService configFileChangeDetector =
Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
private final ScheduledExecutorService configFileChangeDetector =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
private String metadataUrl;
private ObjectMapper mapper;
private ClusterMetadataProviderService providerService;
......@@ -108,14 +107,8 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr
module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
mapper.registerModule(module);
providerService = providerRegistry.register(this);
configFileChangeDetector.execute(() -> {
try {
watchConfigFile();
} catch (IOException e) {
log.warn("Failure in setting up a watch for config "
+ "file updates. updates to {} will be ignored", CONFIG_FILE, e);
}
});
metadataUrl = System.getProperty("onos.cluster.metadata.uri", "file://" + CONFIG_DIR + "/" + CONFIG_FILE);
configFileChangeDetector.scheduleWithFixedDelay(() -> watchUrl(metadataUrl), 100, 500, TimeUnit.MILLISECONDS);
log.info("Started");
}
......@@ -136,7 +129,7 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr
checkState(isAvailable());
synchronized (this) {
if (cachedMetadata.get() == null) {
cachedMetadata.set(fetchMetadata());
cachedMetadata.set(fetchMetadata(metadataUrl));
}
return cachedMetadata.get();
}
......@@ -170,23 +163,45 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr
@Override
public boolean isAvailable() {
return CONFIG_FILE.exists();
try {
URL url = new URL(metadataUrl);
if (url.getProtocol().equals("file")) {
File file = new File(metadataUrl.replaceFirst("file://", ""));
return file.exists();
} else if (url.getProtocol().equals("http")) {
url.openStream();
return true;
} else {
// Unsupported protocol
return false;
}
} catch (Exception e) {
return false;
}
}
private Versioned<ClusterMetadata> fetchMetadata() {
private Versioned<ClusterMetadata> fetchMetadata(String metadataUrl) {
try {
URL url = new URL(metadataUrl);
ClusterMetadata metadata = null;
long version = 0;
try {
metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class);
version = CONFIG_FILE.lastModified();
} catch (IOException e) {
Throwables.propagate(e);
if (url.getProtocol().equals("file")) {
File file = new File(metadataUrl.replaceFirst("file://", ""));
version = file.lastModified();
metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
} else if (url.getProtocol().equals("http")) {
URLConnection conn = url.openConnection();
version = conn.getLastModified();
metadata = mapper.readValue(conn.getInputStream(), ClusterMetadata.class);
}
return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
metadata.getName(),
Sets.newHashSet(metadata.getNodes()),
Sets.newHashSet(metadata.getPartitions())),
version);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
private static class PartitionDeserializer extends JsonDeserializer<Partition> {
......@@ -256,32 +271,16 @@ public class ConfigFileBasedClusterMetadataProvider implements ClusterMetadataPr
}
/**
* Monitors the config file for any updates and notifies providerService accordingly.
* Monitors the metadata url for any updates and notifies providerService accordingly.
* @throws IOException
*/
private void watchConfigFile() throws IOException {
WatchService watcher = FileSystems.getDefault().newWatchService();
Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR);
configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY);
while (true) {
try {
final WatchKey watchKey = watcher.take();
for (WatchEvent<?> event : watchKey.pollEvents()) {
final Path changed = (Path) event.context();
log.info("{} was updated", changed);
// TODO: Fix concurrency issues
Versioned<ClusterMetadata> latestMetadata = fetchMetadata();
private void watchUrl(String metadataUrl) {
// TODO: We are merely polling the url.
// This can be easily addressed for files. For http urls we need to move to a push style protocol.
Versioned<ClusterMetadata> latestMetadata = fetchMetadata(metadataUrl);
if (cachedMetadata.get() != null && cachedMetadata.get().version() < latestMetadata.version()) {
cachedMetadata.set(latestMetadata);
providerService.clusterMetadataChanged(latestMetadata);
}
if (!watchKey.reset()) {
log.debug("WatchKey has been unregistered");
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
\ No newline at end of file
......
......@@ -23,8 +23,10 @@ import io.atomix.variables.DistributedLong;
import java.io.File;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
......@@ -83,7 +85,9 @@ public class StoragePartition implements Managed<StoragePartition> {
@Override
public CompletableFuture<Void> open() {
if (partition.getMembers().contains(localNodeId)) {
openServer();
}
return openClient().thenAccept(v -> isOpened.set(true))
.thenApply(v -> null);
}
......@@ -120,6 +124,10 @@ public class StoragePartition implements Managed<StoragePartition> {
return Collections2.transform(partition.getMembers(), this::toAddress);
}
/**
* Attempts to rejoin the partition.
* @return future that is completed after the operation is complete
*/
private CompletableFuture<Void> openServer() {
if (!partition.getMembers().contains(localNodeId) || server != null) {
return CompletableFuture.completedFuture(null);
......@@ -135,6 +143,26 @@ public class StoragePartition implements Managed<StoragePartition> {
return server.open().thenRun(() -> this.server = server);
}
/**
* Attempts to join the partition as a new member.
* @return future that is completed after the operation is complete
*/
private CompletableFuture<Void> joinCluster() {
Set<NodeId> otherMembers = partition.getMembers()
.stream()
.filter(nodeId -> !nodeId.equals(localNodeId))
.collect(Collectors.toSet());
StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
this,
serializer,
() -> new CopycatTransport(CopycatTransport.Mode.SERVER,
partition.getId(),
messagingService),
RESOURCE_TYPES,
logFolder);
return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
}
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
serializer,
......@@ -149,7 +177,7 @@ public class StoragePartition implements Managed<StoragePartition> {
* Closes the partition server if it was previously opened.
* @return future that is completed when the operation completes
*/
public CompletableFuture<Void> closeServer() {
public CompletableFuture<Void> leaveCluster() {
return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
}
......@@ -181,15 +209,21 @@ public class StoragePartition implements Managed<StoragePartition> {
* @return partition info
*/
public Optional<PartitionInfo> info() {
return server != null ? Optional.of(server.info()) : Optional.empty();
return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
}
public void onUpdate(Partition partition) {
this.partition = partition;
public void onUpdate(Partition newValue) {
if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
return;
}
if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
return;
}
this.partition = newValue;
if (partition.getMembers().contains(localNodeId)) {
openServer();
joinCluster();
} else if (!partition.getMembers().contains(localNodeId)) {
closeServer();
leaveCluster();
}
}
}
......
......@@ -77,7 +77,7 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
server = buildServer();
server = buildServer(partition.getMemberAddresses());
}
serverOpenFuture = server.open();
} else {
......@@ -109,12 +109,12 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
return server.close();
}
private CopycatServer buildServer() {
private CopycatServer buildServer(Collection<Address> clusterMembers) {
ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
......@@ -130,6 +130,18 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
return server;
}
public CompletableFuture<Void> join(Collection<Address> otherMembers) {
server = buildServer(otherMembers);
return server.open().whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully joined partition {}", partition.getId());
} else {
log.info("Failed to join partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
}
@Override
public boolean isOpen() {
return server.isOpen();
......