Madan Jampani

Enabled leadership service implementation based on consistent map + added precon…

…dition checks to partitioned database.

Change-Id: Ia76f8479d9113e7ad67e583e4ca157e62a1cabc7
...@@ -73,7 +73,7 @@ import static org.onlab.util.Tools.groupedThreads; ...@@ -73,7 +73,7 @@ import static org.onlab.util.Tools.groupedThreads;
73 * the current leader (e.g., for informational purpose). 73 * the current leader (e.g., for informational purpose).
74 * </p> 74 * </p>
75 */ 75 */
76 -@Component(immediate = true) 76 +@Component(immediate = true, enabled = false)
77 @Service 77 @Service
78 public class HazelcastLeadershipService implements LeadershipService { 78 public class HazelcastLeadershipService implements LeadershipService {
79 private static final Logger log = 79 private static final Logger log =
......
...@@ -145,7 +145,7 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -145,7 +145,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
145 }); 145 });
146 try { 146 try {
147 if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) { 147 if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
148 - log.warn("Timeed out watiing for database to initialize."); 148 + log.warn("Timed out waiting for database to initialize.");
149 } 149 }
150 } catch (InterruptedException e) { 150 } catch (InterruptedException e) {
151 Thread.currentThread().interrupt(); 151 Thread.currentThread().interrupt();
......
...@@ -52,7 +52,7 @@ import com.google.common.collect.Sets; ...@@ -52,7 +52,7 @@ import com.google.common.collect.Sets;
52 * detection capabilities to detect and purge stale locks. 52 * detection capabilities to detect and purge stale locks.
53 * TODO: Ensure lock safety and liveness. 53 * TODO: Ensure lock safety and liveness.
54 */ 54 */
55 -@Component(immediate = true, enabled = false) 55 +@Component(immediate = true, enabled = true)
56 @Service 56 @Service
57 public class DistributedLeadershipManager implements LeadershipService { 57 public class DistributedLeadershipManager implements LeadershipService {
58 58
......
...@@ -36,6 +36,8 @@ import com.google.common.collect.Sets; ...@@ -36,6 +36,8 @@ import com.google.common.collect.Sets;
36 36
37 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; 37 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
38 38
39 +import static com.google.common.base.Preconditions.checkState;
40 +
39 /** 41 /**
40 * A database that partitions the keys across one or more database partitions. 42 * A database that partitions the keys across one or more database partitions.
41 */ 43 */
...@@ -44,11 +46,21 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -44,11 +46,21 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
44 private Partitioner<String> partitioner; 46 private Partitioner<String> partitioner;
45 private final ClusterCoordinator coordinator; 47 private final ClusterCoordinator coordinator;
46 private final Map<String, Database> partitions = Maps.newConcurrentMap(); 48 private final Map<String, Database> partitions = Maps.newConcurrentMap();
49 + private final AtomicBoolean isOpen = new AtomicBoolean(false);
50 + private static final String DB_NOT_OPEN = "Database is not open";
47 51
48 protected PartitionedDatabase(ClusterCoordinator coordinator) { 52 protected PartitionedDatabase(ClusterCoordinator coordinator) {
49 this.coordinator = coordinator; 53 this.coordinator = coordinator;
50 } 54 }
51 55
56 + /**
57 + * Returns true if the database is open.
58 + * @return true if open, false otherwise
59 + */
60 + public boolean isOpen() {
61 + return isOpen.get();
62 + }
63 +
52 @Override 64 @Override
53 public void registerPartition(String name, Database partition) { 65 public void registerPartition(String name, Database partition) {
54 partitions.put(name, partition); 66 partitions.put(name, partition);
...@@ -61,6 +73,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -61,6 +73,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
61 73
62 @Override 74 @Override
63 public CompletableFuture<Integer> size(String tableName) { 75 public CompletableFuture<Integer> size(String tableName) {
76 + checkState(isOpen.get(), DB_NOT_OPEN);
64 AtomicInteger totalSize = new AtomicInteger(0); 77 AtomicInteger totalSize = new AtomicInteger(0);
65 return CompletableFuture.allOf(partitions 78 return CompletableFuture.allOf(partitions
66 .values() 79 .values()
...@@ -72,16 +85,19 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -72,16 +85,19 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
72 85
73 @Override 86 @Override
74 public CompletableFuture<Boolean> isEmpty(String tableName) { 87 public CompletableFuture<Boolean> isEmpty(String tableName) {
88 + checkState(isOpen.get(), DB_NOT_OPEN);
75 return size(tableName).thenApply(size -> size == 0); 89 return size(tableName).thenApply(size -> size == 0);
76 } 90 }
77 91
78 @Override 92 @Override
79 public CompletableFuture<Boolean> containsKey(String tableName, String key) { 93 public CompletableFuture<Boolean> containsKey(String tableName, String key) {
94 + checkState(isOpen.get(), DB_NOT_OPEN);
80 return partitioner.getPartition(tableName, key).containsKey(tableName, key); 95 return partitioner.getPartition(tableName, key).containsKey(tableName, key);
81 } 96 }
82 97
83 @Override 98 @Override
84 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) { 99 public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
100 + checkState(isOpen.get(), DB_NOT_OPEN);
85 AtomicBoolean containsValue = new AtomicBoolean(false); 101 AtomicBoolean containsValue = new AtomicBoolean(false);
86 return CompletableFuture.allOf(partitions 102 return CompletableFuture.allOf(partitions
87 .values() 103 .values()
...@@ -93,21 +109,25 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -93,21 +109,25 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
93 109
94 @Override 110 @Override
95 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) { 111 public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
112 + checkState(isOpen.get(), DB_NOT_OPEN);
96 return partitioner.getPartition(tableName, key).get(tableName, key); 113 return partitioner.getPartition(tableName, key).get(tableName, key);
97 } 114 }
98 115
99 @Override 116 @Override
100 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) { 117 public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
118 + checkState(isOpen.get(), DB_NOT_OPEN);
101 return partitioner.getPartition(tableName, key).put(tableName, key, value); 119 return partitioner.getPartition(tableName, key).put(tableName, key, value);
102 } 120 }
103 121
104 @Override 122 @Override
105 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) { 123 public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
124 + checkState(isOpen.get(), DB_NOT_OPEN);
106 return partitioner.getPartition(tableName, key).remove(tableName, key); 125 return partitioner.getPartition(tableName, key).remove(tableName, key);
107 } 126 }
108 127
109 @Override 128 @Override
110 public CompletableFuture<Void> clear(String tableName) { 129 public CompletableFuture<Void> clear(String tableName) {
130 + checkState(isOpen.get(), DB_NOT_OPEN);
111 return CompletableFuture.allOf(partitions 131 return CompletableFuture.allOf(partitions
112 .values() 132 .values()
113 .stream() 133 .stream()
...@@ -117,6 +137,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -117,6 +137,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
117 137
118 @Override 138 @Override
119 public CompletableFuture<Set<String>> keySet(String tableName) { 139 public CompletableFuture<Set<String>> keySet(String tableName) {
140 + checkState(isOpen.get(), DB_NOT_OPEN);
120 Set<String> keySet = Sets.newConcurrentHashSet(); 141 Set<String> keySet = Sets.newConcurrentHashSet();
121 return CompletableFuture.allOf(partitions 142 return CompletableFuture.allOf(partitions
122 .values() 143 .values()
...@@ -128,6 +149,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -128,6 +149,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
128 149
129 @Override 150 @Override
130 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) { 151 public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
152 + checkState(isOpen.get(), DB_NOT_OPEN);
131 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>(); 153 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
132 return CompletableFuture.allOf(partitions 154 return CompletableFuture.allOf(partitions
133 .values() 155 .values()
...@@ -139,6 +161,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -139,6 +161,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
139 161
140 @Override 162 @Override
141 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) { 163 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
164 + checkState(isOpen.get(), DB_NOT_OPEN);
142 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); 165 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
143 return CompletableFuture.allOf(partitions 166 return CompletableFuture.allOf(partitions
144 .values() 167 .values()
...@@ -150,31 +173,37 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -150,31 +173,37 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
150 173
151 @Override 174 @Override
152 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) { 175 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
176 + checkState(isOpen.get(), DB_NOT_OPEN);
153 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value); 177 return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
154 } 178 }
155 179
156 @Override 180 @Override
157 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) { 181 public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
182 + checkState(isOpen.get(), DB_NOT_OPEN);
158 return partitioner.getPartition(tableName, key).remove(tableName, key, value); 183 return partitioner.getPartition(tableName, key).remove(tableName, key, value);
159 } 184 }
160 185
161 @Override 186 @Override
162 public CompletableFuture<Boolean> remove(String tableName, String key, long version) { 187 public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
188 + checkState(isOpen.get(), DB_NOT_OPEN);
163 return partitioner.getPartition(tableName, key).remove(tableName, key, version); 189 return partitioner.getPartition(tableName, key).remove(tableName, key, version);
164 } 190 }
165 191
166 @Override 192 @Override
167 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) { 193 public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
194 + checkState(isOpen.get(), DB_NOT_OPEN);
168 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue); 195 return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
169 } 196 }
170 197
171 @Override 198 @Override
172 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) { 199 public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
200 + checkState(isOpen.get(), DB_NOT_OPEN);
173 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue); 201 return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
174 } 202 }
175 203
176 @Override 204 @Override
177 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) { 205 public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
206 + checkState(isOpen.get(), DB_NOT_OPEN);
178 Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap(); 207 Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
179 for (UpdateOperation<String, byte[]> update : updates) { 208 for (UpdateOperation<String, byte[]> update : updates) {
180 Database partition = partitioner.getPartition(update.tableName(), update.key()); 209 Database partition = partitioner.getPartition(update.tableName(), update.key());
...@@ -207,12 +236,15 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -207,12 +236,15 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
207 .stream() 236 .stream()
208 .map(Database::open) 237 .map(Database::open)
209 .toArray(CompletableFuture[]::new)) 238 .toArray(CompletableFuture[]::new))
210 - .thenApply(v -> this)); 239 + .thenApply(v -> {
240 + isOpen.set(true);
241 + return this; }));
211 242
212 } 243 }
213 244
214 @Override 245 @Override
215 public CompletableFuture<Void> close() { 246 public CompletableFuture<Void> close() {
247 + checkState(isOpen.get(), DB_NOT_OPEN);
216 CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions 248 CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
217 .values() 249 .values()
218 .stream() 250 .stream()
......
...@@ -33,6 +33,8 @@ import org.onosproject.net.intent.Key; ...@@ -33,6 +33,8 @@ import org.onosproject.net.intent.Key;
33 import org.slf4j.Logger; 33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory; 34 import org.slf4j.LoggerFactory;
35 35
36 +import com.google.common.base.Objects;
37 +
36 import java.util.List; 38 import java.util.List;
37 import java.util.concurrent.Executors; 39 import java.util.concurrent.Executors;
38 import java.util.concurrent.ScheduledExecutorService; 40 import java.util.concurrent.ScheduledExecutorService;
...@@ -169,7 +171,7 @@ public class PartitionManager implements PartitionService { ...@@ -169,7 +171,7 @@ public class PartitionManager implements PartitionService {
169 public void event(LeadershipEvent event) { 171 public void event(LeadershipEvent event) {
170 Leadership leadership = event.subject(); 172 Leadership leadership = event.subject();
171 173
172 - if (leadership.leader().equals(clusterService.getLocalNode().id()) && 174 + if (Objects.equal(leadership.leader(), clusterService.getLocalNode().id()) &&
173 leadership.topic().startsWith(ELECTION_PREFIX)) { 175 leadership.topic().startsWith(ELECTION_PREFIX)) {
174 176
175 // See if we need to let some partitions go 177 // See if we need to let some partitions go
......