Madan Jampani

Ensure all active nodes are in contention before relinquishing a partition

Change-Id: I846f810547b286736d26d319f315048398334a83
...@@ -200,10 +200,14 @@ public class IntentPartitionManager implements IntentPartitionService { ...@@ -200,10 +200,14 @@ public class IntentPartitionManager implements IntentPartitionService {
200 200
201 for (int i = 0; i < relinquish; i++) { 201 for (int i = 0; i < relinquish; i++) {
202 String topic = myPartitions.get(i); 202 String topic = myPartitions.get(i);
203 + // Wait till all active nodes are in contention for partition ownership.
204 + // This avoids too many relinquish/reclaim cycles.
205 + if (leadershipService.getCandidates(topic).size() == activeNodes) {
203 leadershipService.withdraw(topic); 206 leadershipService.withdraw(topic);
204 executor.schedule(() -> recontest(topic), BACKOFF_TIME, TimeUnit.SECONDS); 207 executor.schedule(() -> recontest(topic), BACKOFF_TIME, TimeUnit.SECONDS);
205 } 208 }
206 } 209 }
210 + }
207 211
208 private void scheduleRebalance(int afterDelaySec) { 212 private void scheduleRebalance(int afterDelaySec) {
209 if (rebalanceScheduled.compareAndSet(false, true)) { 213 if (rebalanceScheduled.compareAndSet(false, true)) {
......
...@@ -116,6 +116,11 @@ public class IntentPartitionManagerTest { ...@@ -116,6 +116,11 @@ public class IntentPartitionManagerTest {
116 allNodes)) 116 allNodes))
117 .anyTimes(); 117 .anyTimes();
118 } 118 }
119 + for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
120 + expect(leadershipService.getCandidates(ELECTION_PREFIX + i))
121 + .andReturn(Arrays.asList(MY_NODE_ID, OTHER_NODE_ID))
122 + .anyTimes();
123 + }
119 } 124 }
120 125
121 /** 126 /**
......