Pavlin Radoslavov

Updated the HazelcastLeadershipService implemenation:

 * Every listener receives all leadership events, even for new topics and
   topics the local instance is not running for a leadership election
 * Now getLeaderBoard() returns all leadership info

Change-Id: Ia11a10ed287d2f8d905dd987beb8052c35be6cf1
...@@ -71,7 +71,8 @@ import com.hazelcast.core.MessageListener; ...@@ -71,7 +71,8 @@ import com.hazelcast.core.MessageListener;
71 */ 71 */
72 @Component(immediate = true) 72 @Component(immediate = true)
73 @Service 73 @Service
74 -public class HazelcastLeadershipService implements LeadershipService { 74 +public class HazelcastLeadershipService implements LeadershipService,
75 + MessageListener<byte[]> {
75 private static final Logger log = 76 private static final Logger log =
76 LoggerFactory.getLogger(HazelcastLeadershipService.class); 77 LoggerFactory.getLogger(HazelcastLeadershipService.class);
77 78
...@@ -87,6 +88,7 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -87,6 +88,7 @@ public class HazelcastLeadershipService implements LeadershipService {
87 88
88 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s 89 private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
89 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s 90 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
91 + private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
90 92
91 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92 protected ClusterService clusterService; 94 protected ClusterService clusterService;
...@@ -102,18 +104,29 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -102,18 +104,29 @@ public class HazelcastLeadershipService implements LeadershipService {
102 private final Map<String, Topic> topics = Maps.newConcurrentMap(); 104 private final Map<String, Topic> topics = Maps.newConcurrentMap();
103 private NodeId localNodeId; 105 private NodeId localNodeId;
104 106
107 + private ITopic<byte[]> leaderTopic;
108 + private String leaderTopicRegistrationId;
109 +
105 @Activate 110 @Activate
106 protected void activate() { 111 protected void activate() {
107 localNodeId = clusterService.getLocalNode().id(); 112 localNodeId = clusterService.getLocalNode().id();
108 listenerRegistry = new AbstractListenerRegistry<>(); 113 listenerRegistry = new AbstractListenerRegistry<>();
109 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); 114 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
110 115
116 + TopicConfig topicConfig = new TopicConfig();
117 + topicConfig.setGlobalOrderingEnabled(true);
118 + topicConfig.setName(TOPIC_HZ_ID);
119 + storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
120 + leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
121 + leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
122 +
111 log.info("Hazelcast Leadership Service started"); 123 log.info("Hazelcast Leadership Service started");
112 } 124 }
113 125
114 @Deactivate 126 @Deactivate
115 protected void deactivate() { 127 protected void deactivate() {
116 eventDispatcher.removeSink(LeadershipEvent.class); 128 eventDispatcher.removeSink(LeadershipEvent.class);
129 + leaderTopic.removeMessageListener(leaderTopicRegistrationId);
117 130
118 for (Topic topic : topics.values()) { 131 for (Topic topic : topics.values()) {
119 topic.stop(); 132 topic.stop();
...@@ -139,6 +152,9 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -139,6 +152,9 @@ public class HazelcastLeadershipService implements LeadershipService {
139 Topic oldTopic = topics.putIfAbsent(path, topic); 152 Topic oldTopic = topics.putIfAbsent(path, topic);
140 if (oldTopic == null) { 153 if (oldTopic == null) {
141 topic.start(); 154 topic.start();
155 + topic.runForLeadership();
156 + } else {
157 + oldTopic.runForLeadership();
142 } 158 }
143 } 159 }
144 160
...@@ -156,11 +172,6 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -156,11 +172,6 @@ public class HazelcastLeadershipService implements LeadershipService {
156 public Map<String, Leadership> getLeaderBoard() { 172 public Map<String, Leadership> getLeaderBoard() {
157 Map<String, Leadership> result = new HashMap<>(); 173 Map<String, Leadership> result = new HashMap<>();
158 174
159 - //
160 - // Get the leaders for the topics.
161 - // NOTE: A topic is listed only if this instance is running for
162 - // a leadership for that topic.
163 - //
164 for (Topic topic : topics.values()) { 175 for (Topic topic : topics.values()) {
165 Leadership leadership = new Leadership(topic.topicName(), 176 Leadership leadership = new Leadership(topic.topicName(),
166 topic.leader(), 177 topic.leader(),
...@@ -180,12 +191,41 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -180,12 +191,41 @@ public class HazelcastLeadershipService implements LeadershipService {
180 listenerRegistry.removeListener(listener); 191 listenerRegistry.removeListener(listener);
181 } 192 }
182 193
194 + @Override
195 + public void onMessage(Message<byte[]> message) {
196 + LeadershipEvent leadershipEvent =
197 + SERIALIZER.decode(message.getMessageObject());
198 +
199 + log.debug("Leadership Event: time = {} type = {} event = {}",
200 + leadershipEvent.time(), leadershipEvent.type(),
201 + leadershipEvent);
202 +
203 + //
204 + // If there is no entry for the topic, then create a new one to
205 + // keep track of the leadership, but don't run for leadership itself.
206 + //
207 + String topicName = leadershipEvent.subject().topic();
208 + Topic topic = topics.get(topicName);
209 + if (topic == null) {
210 + topic = new Topic(topicName);
211 + Topic oldTopic = topics.putIfAbsent(topicName, topic);
212 + if (oldTopic == null) {
213 + topic.start();
214 + } else {
215 + topic = oldTopic;
216 + }
217 + }
218 + topic.receivedLeadershipEvent(leadershipEvent);
219 + eventDispatcher.post(leadershipEvent);
220 + }
221 +
183 /** 222 /**
184 * Class for keeping per-topic information. 223 * Class for keeping per-topic information.
185 */ 224 */
186 - private final class Topic implements MessageListener<byte[]> { 225 + private final class Topic {
187 private final String topicName; 226 private final String topicName;
188 private volatile boolean isShutdown = true; 227 private volatile boolean isShutdown = true;
228 + private volatile boolean isRunningForLeadership = false;
189 private volatile long lastLeadershipUpdateMs = 0; 229 private volatile long lastLeadershipUpdateMs = 0;
190 private ExecutorService leaderElectionExecutor; 230 private ExecutorService leaderElectionExecutor;
191 231
...@@ -193,8 +233,6 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -193,8 +233,6 @@ public class HazelcastLeadershipService implements LeadershipService {
193 private Lock leaderLock; 233 private Lock leaderLock;
194 private Future<?> getLockFuture; 234 private Future<?> getLockFuture;
195 private Future<?> periodicProcessingFuture; 235 private Future<?> periodicProcessingFuture;
196 - private ITopic<byte[]> leaderTopic;
197 - private String leaderTopicRegistrationId;
198 236
199 /** 237 /**
200 * Constructor. 238 * Constructor.
...@@ -224,38 +262,39 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -224,38 +262,39 @@ public class HazelcastLeadershipService implements LeadershipService {
224 } 262 }
225 263
226 /** 264 /**
227 - * Starts leadership election for the topic. 265 + * Starts operation.
228 */ 266 */
229 private void start() { 267 private void start() {
230 isShutdown = false; 268 isShutdown = false;
231 - String lockHzId = "LeadershipService/" + topicName + "/lock";
232 - String topicHzId = "LeadershipService/" + topicName + "/topic";
233 - leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
234 -
235 String threadPoolName = "leader-election-" + topicName + "-%d"; 269 String threadPoolName = "leader-election-" + topicName + "-%d";
236 leaderElectionExecutor = Executors.newScheduledThreadPool(2, 270 leaderElectionExecutor = Executors.newScheduledThreadPool(2,
237 namedThreads(threadPoolName)); 271 namedThreads(threadPoolName));
238 272
239 - TopicConfig topicConfig = new TopicConfig(); 273 + periodicProcessingFuture =
240 - topicConfig.setGlobalOrderingEnabled(true); 274 + leaderElectionExecutor.submit(new Runnable() {
241 - topicConfig.setName(topicHzId);
242 - storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
243 -
244 - leaderTopic =
245 - storeService.getHazelcastInstance().getTopic(topicHzId);
246 - leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
247 -
248 - getLockFuture = leaderElectionExecutor.submit(new Runnable() {
249 @Override 275 @Override
250 public void run() { 276 public void run() {
251 - doLeaderElectionThread(); 277 + doPeriodicProcessing();
252 } 278 }
253 }); 279 });
254 - periodicProcessingFuture = 280 + }
255 - leaderElectionExecutor.submit(new Runnable() { 281 +
282 + /**
283 + * Runs for leadership.
284 + */
285 + private void runForLeadership() {
286 + if (isRunningForLeadership) {
287 + return; // Nothing to do: already running
288 + }
289 + if (isShutdown) {
290 + start();
291 + }
292 + String lockHzId = "LeadershipService/" + topicName + "/lock";
293 + leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
294 + getLockFuture = leaderElectionExecutor.submit(new Runnable() {
256 @Override 295 @Override
257 public void run() { 296 public void run() {
258 - doPeriodicProcessing(); 297 + doLeaderElectionThread();
259 } 298 }
260 }); 299 });
261 } 300 }
...@@ -265,21 +304,19 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -265,21 +304,19 @@ public class HazelcastLeadershipService implements LeadershipService {
265 */ 304 */
266 private void stop() { 305 private void stop() {
267 isShutdown = true; 306 isShutdown = true;
268 - leaderTopic.removeMessageListener(leaderTopicRegistrationId); 307 + isRunningForLeadership = false;
269 // getLockFuture.cancel(true); 308 // getLockFuture.cancel(true);
270 // periodicProcessingFuture.cancel(true); 309 // periodicProcessingFuture.cancel(true);
271 leaderElectionExecutor.shutdownNow(); 310 leaderElectionExecutor.shutdownNow();
272 } 311 }
273 312
274 - @Override 313 + /**
275 - public void onMessage(Message<byte[]> message) { 314 + * Received a Leadership Event.
276 - LeadershipEvent leadershipEvent = 315 + *
277 - SERIALIZER.decode(message.getMessageObject()); 316 + * @param leadershipEvent the received Leadership Event
317 + */
318 + private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
278 NodeId eventLeaderId = leadershipEvent.subject().leader(); 319 NodeId eventLeaderId = leadershipEvent.subject().leader();
279 -
280 - log.debug("Leadership Event: time = {} type = {} event = {}",
281 - leadershipEvent.time(), leadershipEvent.type(),
282 - leadershipEvent);
283 if (!leadershipEvent.subject().topic().equals(topicName)) { 320 if (!leadershipEvent.subject().topic().equals(topicName)) {
284 return; // Not our topic: ignore 321 return; // Not our topic: ignore
285 } 322 }
...@@ -296,23 +333,21 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -296,23 +333,21 @@ public class HazelcastLeadershipService implements LeadershipService {
296 // Another leader: if we are also a leader, then give up 333 // Another leader: if we are also a leader, then give up
297 // leadership and run for re-election. 334 // leadership and run for re-election.
298 // 335 //
299 - if ((leader != null) && 336 + if ((leader != null) && leader.equals(localNodeId)) {
300 - leader.equals(localNodeId)) { 337 + if (getLockFuture != null) {
301 - getLockFuture.cancel(true); 338 + getLockFuture.cancel(true);
339 + }
302 } else { 340 } else {
303 // Just update the current leader 341 // Just update the current leader
304 leader = leadershipEvent.subject().leader(); 342 leader = leadershipEvent.subject().leader();
305 lastLeadershipUpdateMs = System.currentTimeMillis(); 343 lastLeadershipUpdateMs = System.currentTimeMillis();
306 } 344 }
307 - eventDispatcher.post(leadershipEvent);
308 break; 345 break;
309 case LEADER_BOOTED: 346 case LEADER_BOOTED:
310 // Remove the state for the current leader 347 // Remove the state for the current leader
311 - if ((leader != null) && 348 + if ((leader != null) && eventLeaderId.equals(leader)) {
312 - eventLeaderId.equals(leader)) {
313 leader = null; 349 leader = null;
314 } 350 }
315 - eventDispatcher.post(leadershipEvent);
316 break; 351 break;
317 default: 352 default:
318 break; 353 break;
...@@ -340,7 +375,7 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -340,7 +375,7 @@ public class HazelcastLeadershipService implements LeadershipService {
340 leadershipEvent = new LeadershipEvent( 375 leadershipEvent = new LeadershipEvent(
341 LeadershipEvent.Type.LEADER_REELECTED, 376 LeadershipEvent.Type.LEADER_REELECTED,
342 new Leadership(topicName, localNodeId, 0)); 377 new Leadership(topicName, localNodeId, 0));
343 - // Dispatch to all remote instances 378 + // Dispatch to all instances
344 leaderTopic.publish(SERIALIZER.encode(leadershipEvent)); 379 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
345 } else { 380 } else {
346 // 381 //
...@@ -404,7 +439,6 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -404,7 +439,6 @@ public class HazelcastLeadershipService implements LeadershipService {
404 leadershipEvent = new LeadershipEvent( 439 leadershipEvent = new LeadershipEvent(
405 LeadershipEvent.Type.LEADER_ELECTED, 440 LeadershipEvent.Type.LEADER_ELECTED,
406 new Leadership(topicName, localNodeId, 0)); 441 new Leadership(topicName, localNodeId, 0));
407 - eventDispatcher.post(leadershipEvent);
408 leaderTopic.publish(SERIALIZER.encode(leadershipEvent)); 442 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
409 } 443 }
410 444
...@@ -430,7 +464,6 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -430,7 +464,6 @@ public class HazelcastLeadershipService implements LeadershipService {
430 leadershipEvent = new LeadershipEvent( 464 leadershipEvent = new LeadershipEvent(
431 LeadershipEvent.Type.LEADER_BOOTED, 465 LeadershipEvent.Type.LEADER_BOOTED,
432 new Leadership(topicName, localNodeId, 0)); 466 new Leadership(topicName, localNodeId, 0));
433 - eventDispatcher.post(leadershipEvent);
434 leaderTopic.publish(SERIALIZER.encode(leadershipEvent)); 467 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
435 leaderLock.unlock(); 468 leaderLock.unlock();
436 } 469 }
......