Yuta HIGUCHI
Committed by Yuta Higuchi

HazelcastIntentStore: workaround to provide Event subject on WITHDRAWN

Change-Id: Ie9562d2223fb7e7a89f91f5faaad4bbeec6f4bd9
...@@ -113,7 +113,14 @@ public class HazelcastIntentStore ...@@ -113,7 +113,14 @@ public class HazelcastIntentStore
113 private Timer getIntentTimer; 113 private Timer getIntentTimer;
114 private Timer getIntentStateTimer; 114 private Timer getIntentStateTimer;
115 115
116 - private String listenerId; 116 + // manual near cache of Intent
117 + // (Note: IntentId -> Intent is expected to be immutable)
118 + // entry will be evicted, when state for that IntentId is removed.
119 + private Map<IntentId, Intent> localIntents;
120 +
121 + private String stateListenerId;
122 +
123 + private String intentsListenerId;
117 124
118 private Timer createResponseTimer(String methodName) { 125 private Timer createResponseTimer(String methodName) {
119 return createTimer("IntentStore", methodName, "responseTime"); 126 return createTimer("IntentStore", methodName, "responseTime");
...@@ -122,6 +129,8 @@ public class HazelcastIntentStore ...@@ -122,6 +129,8 @@ public class HazelcastIntentStore
122 @Override 129 @Override
123 @Activate 130 @Activate
124 public void activate() { 131 public void activate() {
132 + localIntents = new ConcurrentHashMap<>();
133 +
125 createIntentTimer = createResponseTimer("createIntent"); 134 createIntentTimer = createResponseTimer("createIntent");
126 removeIntentTimer = createResponseTimer("removeIntent"); 135 removeIntentTimer = createResponseTimer("removeIntent");
127 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents"); 136 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
...@@ -158,6 +167,7 @@ public class HazelcastIntentStore ...@@ -158,6 +167,7 @@ public class HazelcastIntentStore
158 // TODO: enable near cache, allow read from backup for this IMap 167 // TODO: enable near cache, allow read from backup for this IMap
159 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME); 168 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
160 intents = new SMap<>(rawIntents , super.serializer); 169 intents = new SMap<>(rawIntents , super.serializer);
170 + intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
161 171
162 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME); 172 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
163 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount()); 173 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
...@@ -165,7 +175,7 @@ public class HazelcastIntentStore ...@@ -165,7 +175,7 @@ public class HazelcastIntentStore
165 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME); 175 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
166 states = new SMap<>(rawStates , super.serializer); 176 states = new SMap<>(rawStates , super.serializer);
167 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener(); 177 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
168 - listenerId = states.addEntryListener(listener , true); 178 + stateListenerId = states.addEntryListener(listener, true);
169 179
170 transientStates.clear(); 180 transientStates.clear();
171 181
...@@ -180,7 +190,8 @@ public class HazelcastIntentStore ...@@ -180,7 +190,8 @@ public class HazelcastIntentStore
180 190
181 @Deactivate 191 @Deactivate
182 public void deactivate() { 192 public void deactivate() {
183 - states.removeEntryListener(listenerId); 193 + intents.removeEntryListener(intentsListenerId);
194 + states.removeEntryListener(stateListenerId);
184 log.info("Stopped"); 195 log.info("Stopped");
185 } 196 }
186 197
...@@ -245,7 +256,15 @@ public class HazelcastIntentStore ...@@ -245,7 +256,15 @@ public class HazelcastIntentStore
245 public Intent getIntent(IntentId intentId) { 256 public Intent getIntent(IntentId intentId) {
246 Context timer = startTimer(getIntentTimer); 257 Context timer = startTimer(getIntentTimer);
247 try { 258 try {
248 - return intents.get(intentId); 259 + Intent intent = localIntents.get(intentId);
260 + if (intent != null) {
261 + return intent;
262 + }
263 + intent = intents.get(intentId);
264 + if (intent != null) {
265 + localIntents.put(intentId, intent);
266 + }
267 + return intent;
249 } finally { 268 } finally {
250 stopTimer(timer); 269 stopTimer(timer);
251 } 270 }
...@@ -605,15 +624,28 @@ public class HazelcastIntentStore ...@@ -605,15 +624,28 @@ public class HazelcastIntentStore
605 } 624 }
606 } 625 }
607 626
627 + public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
628 +
629 + @Override
630 + public void entryAdded(EntryEvent<IntentId, Intent> event) {
631 + localIntents.put(event.getKey(), event.getValue());
632 + }
633 +
634 + @Override
635 + public void entryUpdated(EntryEvent<IntentId, Intent> event) {
636 + entryAdded(event);
637 + }
638 + }
639 +
608 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> { 640 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
609 641
610 @Override 642 @Override
611 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) { 643 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
644 + final IntentId intentId = event.getKey();
612 final Member myself = theInstance.getCluster().getLocalMember(); 645 final Member myself = theInstance.getCluster().getLocalMember();
613 if (!myself.equals(event.getMember())) { 646 if (!myself.equals(event.getMember())) {
614 // When Intent state was modified by remote node, 647 // When Intent state was modified by remote node,
615 // clear local transient state. 648 // clear local transient state.
616 - final IntentId intentId = event.getKey();
617 IntentState oldState = transientStates.remove(intentId); 649 IntentState oldState = transientStates.remove(intentId);
618 if (oldState != null) { 650 if (oldState != null) {
619 log.debug("{} state updated remotely, removing transient state {}", 651 log.debug("{} state updated remotely, removing transient state {}",
...@@ -622,9 +654,21 @@ public class HazelcastIntentStore ...@@ -622,9 +654,21 @@ public class HazelcastIntentStore
622 654
623 if (event.getValue() != null) { 655 if (event.getValue() != null) {
624 // notify if this is not entry removed event 656 // notify if this is not entry removed event
625 - notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId))); 657 +
658 + final Intent intent = getIntent(intentId);
659 + if (intent == null) {
660 + log.warn("no Intent found for {} on Event {}", intentId, event);
661 + return;
626 } 662 }
663 + notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
664 + // remove IntentCache
665 + localIntents.remove(intentId, intent);
627 } 666 }
628 } 667 }
668 +
669 + // populate manual near cache, to prepare for
670 + // transition event to WITHDRAWN
671 + getIntent(intentId);
672 + }
629 } 673 }
630 } 674 }
......