Madan Jampani
Committed by Gerrit Code Review

PartitionManager support for reacting to cluster metadata changes

Change-Id: I65e358f5cb47e9420fae9589661ba0ce45f58df6
...@@ -16,9 +16,7 @@ ...@@ -16,9 +16,7 @@
16 package org.onosproject.store.primitives; 16 package org.onosproject.store.primitives;
17 17
18 import java.util.List; 18 import java.util.List;
19 -import java.util.concurrent.CompletableFuture;
20 19
21 -import org.onosproject.cluster.PartitionId;
22 import org.onosproject.store.service.PartitionInfo; 20 import org.onosproject.store.service.PartitionInfo;
23 21
24 /** 22 /**
...@@ -31,20 +29,4 @@ public interface PartitionAdminService { ...@@ -31,20 +29,4 @@ public interface PartitionAdminService {
31 * @return list of {@code PartitionInfo} 29 * @return list of {@code PartitionInfo}
32 */ 30 */
33 List<PartitionInfo> partitionInfo(); 31 List<PartitionInfo> partitionInfo();
34 -
35 - /**
36 - * Leaves a partition.
37 - *
38 - * @param partitionId partition identifier
39 - * @return future that is completed when the operation completes.
40 - */
41 - CompletableFuture<Void> leave(PartitionId partitionId);
42 -
43 - /**
44 - * Joins a partition.
45 - *
46 - * @param partitionId partition identifier
47 - * @return future that is completed when the operation completes.
48 - */
49 - CompletableFuture<Void> join(PartitionId partitionId);
50 } 32 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -23,6 +23,7 @@ import java.util.List; ...@@ -23,6 +23,7 @@ import java.util.List;
23 import java.util.Map; 23 import java.util.Map;
24 import java.util.Set; 24 import java.util.Set;
25 import java.util.concurrent.CompletableFuture; 25 import java.util.concurrent.CompletableFuture;
26 +import java.util.concurrent.atomic.AtomicReference;
26 import java.util.stream.Collectors; 27 import java.util.stream.Collectors;
27 28
28 import org.apache.felix.scr.annotations.Activate; 29 import org.apache.felix.scr.annotations.Activate;
...@@ -32,9 +33,14 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -32,9 +33,14 @@ import org.apache.felix.scr.annotations.Reference;
32 import org.apache.felix.scr.annotations.ReferenceCardinality; 33 import org.apache.felix.scr.annotations.ReferenceCardinality;
33 import org.apache.felix.scr.annotations.Service; 34 import org.apache.felix.scr.annotations.Service;
34 import org.onlab.util.Tools; 35 import org.onlab.util.Tools;
36 +import org.onosproject.cluster.ClusterMetadata;
37 +import org.onosproject.cluster.ClusterMetadataDiff;
38 +import org.onosproject.cluster.ClusterMetadataEvent;
39 +import org.onosproject.cluster.ClusterMetadataEventListener;
35 import org.onosproject.cluster.ClusterMetadataService; 40 import org.onosproject.cluster.ClusterMetadataService;
36 import org.onosproject.cluster.ClusterService; 41 import org.onosproject.cluster.ClusterService;
37 import org.onosproject.cluster.NodeId; 42 import org.onosproject.cluster.NodeId;
43 +import org.onosproject.cluster.PartitionDiff;
38 import org.onosproject.cluster.PartitionId; 44 import org.onosproject.cluster.PartitionId;
39 import org.onosproject.event.AbstractListenerManager; 45 import org.onosproject.event.AbstractListenerManager;
40 import org.onosproject.store.cluster.messaging.MessagingService; 46 import org.onosproject.store.cluster.messaging.MessagingService;
...@@ -68,15 +74,19 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa ...@@ -68,15 +74,19 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 74 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected ClusterService clusterService; 75 protected ClusterService clusterService;
70 76
71 - Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap(); 77 + private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
78 + private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
79 + private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
72 80
73 @Activate 81 @Activate
74 public void activate() { 82 public void activate() {
75 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry); 83 eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
76 - 84 + currentClusterMetadata.set(metadataService.getClusterMetadata());
77 - metadataService.getClusterMetadata() 85 + metadataService.addListener(metadataListener);
86 + currentClusterMetadata.get()
78 .getPartitions() 87 .getPartitions()
79 .stream() 88 .stream()
89 + .filter(partition -> !partition.getId().equals(PartitionId.from(0))) // exclude p0
80 .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition, 90 .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(partition,
81 messagingService, 91 messagingService,
82 clusterService, 92 clusterService,
...@@ -93,6 +103,7 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa ...@@ -93,6 +103,7 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
93 103
94 @Deactivate 104 @Deactivate
95 public void deactivate() { 105 public void deactivate() {
106 + metadataService.removeListener(metadataListener);
96 eventDispatcher.removeSink(PartitionEvent.class); 107 eventDispatcher.removeSink(PartitionEvent.class);
97 108
98 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values() 109 CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
...@@ -104,20 +115,6 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa ...@@ -104,20 +115,6 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
104 } 115 }
105 116
106 @Override 117 @Override
107 - public CompletableFuture<Void> leave(PartitionId partitionId) {
108 - return partitions.get(partitionId)
109 - .server()
110 - .map(server -> server.close())
111 - .orElse(CompletableFuture.completedFuture(null));
112 - }
113 -
114 - @Override
115 - public CompletableFuture<Void> join(PartitionId partitionId) {
116 - return partitions.get(partitionId)
117 - .open();
118 - }
119 -
120 - @Override
121 public int getNumberOfPartitions() { 118 public int getNumberOfPartitions() {
122 return partitions.size(); 119 return partitions.size();
123 } 120 }
...@@ -152,4 +149,23 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa ...@@ -152,4 +149,23 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
152 .flatMap(x -> Tools.stream(x.info())) 149 .flatMap(x -> Tools.stream(x.info()))
153 .collect(Collectors.toList()); 150 .collect(Collectors.toList());
154 } 151 }
152 +
153 + private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
154 + ClusterMetadataDiff diffExaminer =
155 + new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
156 + diffExaminer.partitionDiffs()
157 + .values()
158 + .stream()
159 + // TODO: Remove after partition 0 is removed from cluster metadata.
160 + .filter(diff -> !diff.partitionId().equals(PartitionId.from(0)))
161 + .filter(PartitionDiff::hasChanged)
162 + .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
163 + }
164 +
165 + private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
166 + @Override
167 + public void event(ClusterMetadataEvent event) {
168 + processMetadataUpdate(event.subject());
169 + }
170 + }
155 } 171 }
......
...@@ -28,9 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean; ...@@ -28,9 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
28 28
29 import org.onosproject.cluster.ClusterService; 29 import org.onosproject.cluster.ClusterService;
30 import org.onosproject.cluster.ControllerNode; 30 import org.onosproject.cluster.ControllerNode;
31 -import org.onosproject.cluster.DefaultPartition;
32 import org.onosproject.cluster.NodeId; 31 import org.onosproject.cluster.NodeId;
33 import org.onosproject.cluster.Partition; 32 import org.onosproject.cluster.Partition;
33 +import org.onosproject.cluster.PartitionId;
34 import org.onosproject.store.cluster.messaging.MessagingService; 34 import org.onosproject.store.cluster.messaging.MessagingService;
35 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap; 35 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
36 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector; 36 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
...@@ -42,7 +42,7 @@ import com.google.common.collect.ImmutableSet; ...@@ -42,7 +42,7 @@ import com.google.common.collect.ImmutableSet;
42 /** 42 /**
43 * Storage partition. 43 * Storage partition.
44 */ 44 */
45 -public class StoragePartition extends DefaultPartition implements Managed<StoragePartition> { 45 +public class StoragePartition implements Managed<StoragePartition> {
46 46
47 private final AtomicBoolean isOpened = new AtomicBoolean(false); 47 private final AtomicBoolean isOpened = new AtomicBoolean(false);
48 private final AtomicBoolean isClosed = new AtomicBoolean(false); 48 private final AtomicBoolean isClosed = new AtomicBoolean(false);
...@@ -50,14 +50,14 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag ...@@ -50,14 +50,14 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
50 private final MessagingService messagingService; 50 private final MessagingService messagingService;
51 private final ClusterService clusterService; 51 private final ClusterService clusterService;
52 private final File logFolder; 52 private final File logFolder;
53 - private CompletableFuture<StoragePartitionServer> serverOpenFuture; 53 + private Partition partition;
54 private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of( 54 private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
55 new ResourceType(DistributedLong.class), 55 new ResourceType(DistributedLong.class),
56 new ResourceType(AtomixLeaderElector.class), 56 new ResourceType(AtomixLeaderElector.class),
57 new ResourceType(AtomixConsistentMap.class)); 57 new ResourceType(AtomixConsistentMap.class));
58 58
59 private NodeId localNodeId; 59 private NodeId localNodeId;
60 - private Optional<StoragePartitionServer> server = Optional.empty(); 60 + private StoragePartitionServer server;
61 private StoragePartitionClient client; 61 private StoragePartitionClient client;
62 62
63 public StoragePartition(Partition partition, 63 public StoragePartition(Partition partition,
...@@ -65,7 +65,7 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag ...@@ -65,7 +65,7 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
65 ClusterService clusterService, 65 ClusterService clusterService,
66 Serializer serializer, 66 Serializer serializer,
67 File logFolder) { 67 File logFolder) {
68 - super(partition); 68 + this.partition = partition;
69 this.messagingService = messagingService; 69 this.messagingService = messagingService;
70 this.clusterService = clusterService; 70 this.clusterService = clusterService;
71 this.localNodeId = clusterService.getLocalNode().id(); 71 this.localNodeId = clusterService.getLocalNode().id();
...@@ -81,61 +81,86 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag ...@@ -81,61 +81,86 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
81 return client; 81 return client;
82 } 82 }
83 83
84 - /**
85 - * Returns the optional server instance.
86 - * @return server
87 - */
88 - public Optional<StoragePartitionServer> server() {
89 - return server;
90 - }
91 -
92 @Override 84 @Override
93 public CompletableFuture<Void> open() { 85 public CompletableFuture<Void> open() {
94 - serverOpenFuture = openServer(); 86 + openServer();
95 - serverOpenFuture.thenAccept(s -> server = Optional.ofNullable(s));
96 return openClient().thenAccept(v -> isOpened.set(true)) 87 return openClient().thenAccept(v -> isOpened.set(true))
97 .thenApply(v -> null); 88 .thenApply(v -> null);
98 } 89 }
99 90
100 @Override 91 @Override
101 public CompletableFuture<Void> close() { 92 public CompletableFuture<Void> close() {
102 - return closeClient().thenCompose(v -> closeServer()) 93 + // We do not explicitly close the server and instead let the cluster
103 - .thenAccept(v -> isClosed.set(true)) 94 + // deal with this as an unclean exit.
95 + return closeClient().thenAccept(v -> isClosed.set(true))
104 .thenApply(v -> null); 96 .thenApply(v -> null);
105 } 97 }
106 98
99 + /**
100 + * Returns the identifier of the {@link Partition partition} associated with this instance.
101 + * @return partition identifier
102 + */
103 + public PartitionId getId() {
104 + return partition.getId();
105 + }
106 +
107 + /**
108 + * Returns the identifiers of partition members.
109 + * @return partition member instance ids
110 + */
111 + public Collection<NodeId> getMembers() {
112 + return partition.getMembers();
113 + }
114 +
115 + /**
116 + * Returns the {@link Address addresses} of partition members.
117 + * @return partition member addresses
118 + */
107 public Collection<Address> getMemberAddresses() { 119 public Collection<Address> getMemberAddresses() {
108 - return Collections2.transform(getMembers(), this::toAddress); 120 + return Collections2.transform(partition.getMembers(), this::toAddress);
109 } 121 }
110 122
111 - private CompletableFuture<StoragePartitionServer> openServer() { 123 + private CompletableFuture<Void> openServer() {
112 - if (!getMembers().contains(localNodeId)) { 124 + if (!partition.getMembers().contains(localNodeId) || server != null) {
113 return CompletableFuture.completedFuture(null); 125 return CompletableFuture.completedFuture(null);
114 } 126 }
115 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId), 127 StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
116 this, 128 this,
117 serializer, 129 serializer,
118 () -> new CopycatTransport(CopycatTransport.Mode.SERVER, 130 () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
119 - getId(), 131 + partition.getId(),
120 messagingService), 132 messagingService),
121 RESOURCE_TYPES, 133 RESOURCE_TYPES,
122 logFolder); 134 logFolder);
123 - return server.open().thenApply(v -> server); 135 + return server.open().thenRun(() -> this.server = server);
124 } 136 }
125 137
126 private CompletableFuture<StoragePartitionClient> openClient() { 138 private CompletableFuture<StoragePartitionClient> openClient() {
127 client = new StoragePartitionClient(this, 139 client = new StoragePartitionClient(this,
128 serializer, 140 serializer,
129 new CopycatTransport(CopycatTransport.Mode.CLIENT, 141 new CopycatTransport(CopycatTransport.Mode.CLIENT,
130 - getId(), 142 + partition.getId(),
131 messagingService), 143 messagingService),
132 RESOURCE_TYPES); 144 RESOURCE_TYPES);
133 return client.open().thenApply(v -> client); 145 return client.open().thenApply(v -> client);
134 } 146 }
135 147
136 - private CompletableFuture<Void> closeServer() { 148 + /**
137 - return server.map(StoragePartitionServer::close) 149 + * Closes the partition server if it was previously opened.
138 - .orElse(CompletableFuture.completedFuture(null)); 150 + * @return future that is completed when the operation completes
151 + */
152 + public CompletableFuture<Void> closeServer() {
153 + return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
154 + }
155 +
156 + @Override
157 + public boolean isOpen() {
158 + return isOpened.get() && !isClosed.get();
159 + }
160 +
161 + @Override
162 + public boolean isClosed() {
163 + return isClosed.get();
139 } 164 }
140 165
141 private CompletableFuture<Void> closeClient() { 166 private CompletableFuture<Void> closeClient() {
...@@ -150,22 +175,21 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag ...@@ -150,22 +175,21 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
150 return new Address(node.ip().toString(), node.tcpPort()); 175 return new Address(node.ip().toString(), node.tcpPort());
151 } 176 }
152 177
153 - @Override
154 - public boolean isOpen() {
155 - return !isClosed.get() && isOpened.get();
156 - }
157 -
158 - @Override
159 - public boolean isClosed() {
160 - return isOpened.get() && isClosed.get();
161 - }
162 -
163 /** 178 /**
164 * Returns the partition information if this partition is locally managed i.e. 179 * Returns the partition information if this partition is locally managed i.e.
165 * this node is a active member of the partition. 180 * this node is a active member of the partition.
166 * @return partition info 181 * @return partition info
167 */ 182 */
168 public Optional<PartitionInfo> info() { 183 public Optional<PartitionInfo> info() {
169 - return server.map(StoragePartitionServer::info); 184 + return server != null ? Optional.of(server.info()) : Optional.empty();
185 + }
186 +
187 + public void onUpdate(Partition partition) {
188 + this.partition = partition;
189 + if (partition.getMembers().contains(localNodeId)) {
190 + openServer();
191 + } else if (!partition.getMembers().contains(localNodeId)) {
192 + closeServer();
193 + }
170 } 194 }
171 } 195 }
......
...@@ -31,16 +31,13 @@ import io.atomix.resource.ServiceLoaderResourceResolver; ...@@ -31,16 +31,13 @@ import io.atomix.resource.ServiceLoaderResourceResolver;
31 31
32 import java.io.File; 32 import java.io.File;
33 import java.util.Collection; 33 import java.util.Collection;
34 -import java.util.Set;
35 import java.util.concurrent.CompletableFuture; 34 import java.util.concurrent.CompletableFuture;
36 import java.util.function.Supplier; 35 import java.util.function.Supplier;
37 36
38 -import org.onosproject.cluster.NodeId;
39 import org.onosproject.store.service.PartitionInfo; 37 import org.onosproject.store.service.PartitionInfo;
40 import org.slf4j.Logger; 38 import org.slf4j.Logger;
41 39
42 import com.google.common.collect.ImmutableSet; 40 import com.google.common.collect.ImmutableSet;
43 -import com.google.common.collect.Sets;
44 41
45 /** 42 /**
46 * {@link StoragePartition} server. 43 * {@link StoragePartition} server.
...@@ -80,7 +77,7 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> { ...@@ -80,7 +77,7 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
80 return CompletableFuture.completedFuture(null); 77 return CompletableFuture.completedFuture(null);
81 } 78 }
82 synchronized (this) { 79 synchronized (this) {
83 - server = server(); 80 + server = buildServer();
84 } 81 }
85 serverOpenFuture = server.open(); 82 serverOpenFuture = server.open();
86 } else { 83 } else {
...@@ -97,13 +94,22 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> { ...@@ -97,13 +94,22 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
97 94
98 @Override 95 @Override
99 public CompletableFuture<Void> close() { 96 public CompletableFuture<Void> close() {
100 - // We do not close the server because doing so is equivalent to this node 97 + /**
101 - // leaving the cluster and we don't want that here. 98 + * CopycatServer#kill just shuts down the server and does not result
102 - // The Raft protocol should take care of servers leaving unannounced. 99 + * in any cluster membership changes.
103 - return CompletableFuture.completedFuture(null); 100 + */
101 + return server.kill();
104 } 102 }
105 103
106 - private CopycatServer server() { 104 + /**
105 + * Closes the server and exits the partition.
106 + * @return future that is completed when the operation is complete
107 + */
108 + public CompletableFuture<Void> closeAndExit() {
109 + return server.close();
110 + }
111 +
112 + private CopycatServer buildServer() {
107 ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver(); 113 ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
108 ResourceRegistry registry = new ResourceRegistry(); 114 ResourceRegistry registry = new ResourceRegistry();
109 resourceTypes.forEach(registry::register); 115 resourceTypes.forEach(registry::register);
...@@ -124,10 +130,6 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> { ...@@ -124,10 +130,6 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
124 return server; 130 return server;
125 } 131 }
126 132
127 - public Set<NodeId> configuredMembers() {
128 - return Sets.newHashSet(partition.getMembers());
129 - }
130 -
131 @Override 133 @Override
132 public boolean isOpen() { 134 public boolean isOpen() {
133 return server.isOpen(); 135 return server.isOpen();
......