Madan Jampani

Fix IntentPartition rebalance to work correctly after a network partition heals

Change-Id: Ie73598bc191fffb46d18fc3544f9d2b15d10feb7
...@@ -179,6 +179,14 @@ public class IntentPartitionManager implements IntentPartitionService { ...@@ -179,6 +179,14 @@ public class IntentPartitionManager implements IntentPartitionService {
179 179
180 int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes); 180 int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
181 181
182 + // First make sure this node is a candidate for all partitions.
183 + IntStream.range(0, NUM_PARTITIONS)
184 + .mapToObj(this::getPartitionPath)
185 + .map(leadershipService::getLeadership)
186 + .filter(leadership -> !leadership.candidates().contains(localNodeId))
187 + .map(Leadership::topic)
188 + .forEach(leadershipService::runForLeadership);
189 +
182 List<String> myPartitions = IntStream.range(0, NUM_PARTITIONS) 190 List<String> myPartitions = IntStream.range(0, NUM_PARTITIONS)
183 .mapToObj(this::getPartitionPath) 191 .mapToObj(this::getPartitionPath)
184 .map(leadershipService::getLeadership) 192 .map(leadershipService::getLeadership)
...@@ -189,6 +197,7 @@ public class IntentPartitionManager implements IntentPartitionService { ...@@ -189,6 +197,7 @@ public class IntentPartitionManager implements IntentPartitionService {
189 197
190 int relinquish = myPartitions.size() - myShare; 198 int relinquish = myPartitions.size() - myShare;
191 199
200 +
192 for (int i = 0; i < relinquish; i++) { 201 for (int i = 0; i < relinquish; i++) {
193 String topic = myPartitions.get(i); 202 String topic = myPartitions.get(i);
194 leadershipService.withdraw(topic); 203 leadershipService.withdraw(topic);
......