Fixed couple of GossipIntentStore bugs.
* Register Collections.emptyList() in Kryo (some intents contain this) * Other stores set INSTALL_REQ state implicitly when a CREATE_INTENT batchwrite is received, so we should do this too. Change-Id: I19167ab0d3c7b98c32d5af9198843ff33b7c8a23
Showing
1 changed file
with
19 additions
and
13 deletions
... | @@ -48,6 +48,7 @@ import org.slf4j.Logger; | ... | @@ -48,6 +48,7 @@ import org.slf4j.Logger; |
48 | 48 | ||
49 | import java.io.IOException; | 49 | import java.io.IOException; |
50 | import java.util.ArrayList; | 50 | import java.util.ArrayList; |
51 | +import java.util.Collections; | ||
51 | import java.util.List; | 52 | import java.util.List; |
52 | import java.util.Set; | 53 | import java.util.Set; |
53 | import java.util.concurrent.ConcurrentHashMap; | 54 | import java.util.concurrent.ConcurrentHashMap; |
... | @@ -108,6 +109,7 @@ public class GossipIntentStore | ... | @@ -108,6 +109,7 @@ public class GossipIntentStore |
108 | .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) | 109 | .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN) |
109 | .register(InternalIntentEvent.class) | 110 | .register(InternalIntentEvent.class) |
110 | .register(InternalSetInstallablesEvent.class) | 111 | .register(InternalSetInstallablesEvent.class) |
112 | + .register(Collections.emptyList().getClass()) | ||
111 | //.register(InternalIntentAntiEntropyEvent.class) | 113 | //.register(InternalIntentAntiEntropyEvent.class) |
112 | //.register(IntentAntiEntropyAdvertisement.class) | 114 | //.register(IntentAntiEntropyAdvertisement.class) |
113 | .build(); | 115 | .build(); |
... | @@ -247,6 +249,8 @@ public class GossipIntentStore | ... | @@ -247,6 +249,8 @@ public class GossipIntentStore |
247 | List<IntentEvent> events = Lists.newArrayList(); | 249 | List<IntentEvent> events = Lists.newArrayList(); |
248 | List<BatchWrite.Operation> failed = new ArrayList<>(); | 250 | List<BatchWrite.Operation> failed = new ArrayList<>(); |
249 | 251 | ||
252 | + Timestamp timestamp = null; | ||
253 | + | ||
250 | for (BatchWrite.Operation op : batch.operations()) { | 254 | for (BatchWrite.Operation op : batch.operations()) { |
251 | switch (op.type()) { | 255 | switch (op.type()) { |
252 | case CREATE_INTENT: | 256 | case CREATE_INTENT: |
... | @@ -254,9 +258,12 @@ public class GossipIntentStore | ... | @@ -254,9 +258,12 @@ public class GossipIntentStore |
254 | "CREATE_INTENT takes 1 argument. %s", op); | 258 | "CREATE_INTENT takes 1 argument. %s", op); |
255 | Intent intent = op.arg(0); | 259 | Intent intent = op.arg(0); |
256 | 260 | ||
257 | - events.add(createIntentInternal(intent)); | 261 | + timestamp = intentClockService.getTimestamp(intent.id()); |
258 | - notifyPeers(new InternalIntentEvent( | 262 | + if (createIntentInternal(intent)) { |
259 | - intent.id(), intent, INSTALL_REQ, null)); | 263 | + events.add(setStateInternal(intent.id(), INSTALL_REQ, timestamp)); |
264 | + notifyPeers(new InternalIntentEvent(intent.id(), intent, | ||
265 | + INSTALL_REQ, timestamp)); | ||
266 | + } | ||
260 | 267 | ||
261 | break; | 268 | break; |
262 | case REMOVE_INTENT: | 269 | case REMOVE_INTENT: |
... | @@ -272,8 +279,7 @@ public class GossipIntentStore | ... | @@ -272,8 +279,7 @@ public class GossipIntentStore |
272 | intent = op.arg(0); | 279 | intent = op.arg(0); |
273 | IntentState newState = op.arg(1); | 280 | IntentState newState = op.arg(1); |
274 | 281 | ||
275 | - Timestamp timestamp = intentClockService.getTimestamp( | 282 | + timestamp = intentClockService.getTimestamp(intent.id()); |
276 | - intent.id()); | ||
277 | IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp); | 283 | IntentEvent externalEvent = setStateInternal(intent.id(), newState, timestamp); |
278 | events.add(externalEvent); | 284 | events.add(externalEvent); |
279 | notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp)); | 285 | notifyPeers(new InternalIntentEvent(intent.id(), null, newState, timestamp)); |
... | @@ -285,11 +291,11 @@ public class GossipIntentStore | ... | @@ -285,11 +291,11 @@ public class GossipIntentStore |
285 | intentId = op.arg(0); | 291 | intentId = op.arg(0); |
286 | List<Intent> installableIntents = op.arg(1); | 292 | List<Intent> installableIntents = op.arg(1); |
287 | 293 | ||
288 | - Timestamp timestamp1 = intentClockService.getTimestamp(intentId); | 294 | + timestamp = intentClockService.getTimestamp(intentId); |
289 | setInstallableIntentsInternal( | 295 | setInstallableIntentsInternal( |
290 | - intentId, installableIntents, timestamp1); | 296 | + intentId, installableIntents, timestamp); |
291 | 297 | ||
292 | - notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp1)); | 298 | + notifyPeers(new InternalSetInstallablesEvent(intentId, installableIntents, timestamp)); |
293 | 299 | ||
294 | break; | 300 | break; |
295 | case REMOVE_INSTALLED: | 301 | case REMOVE_INSTALLED: |
... | @@ -309,15 +315,15 @@ public class GossipIntentStore | ... | @@ -309,15 +315,15 @@ public class GossipIntentStore |
309 | return failed; | 315 | return failed; |
310 | } | 316 | } |
311 | 317 | ||
312 | - private IntentEvent createIntentInternal(Intent intent) { | 318 | + private boolean createIntentInternal(Intent intent) { |
313 | Intent oldValue = intents.putIfAbsent(intent.id(), intent); | 319 | Intent oldValue = intents.putIfAbsent(intent.id(), intent); |
314 | if (oldValue == null) { | 320 | if (oldValue == null) { |
315 | - return IntentEvent.getEvent(INSTALL_REQ, intent); | 321 | + return true; |
316 | } | 322 | } |
317 | 323 | ||
318 | log.warn("Intent ID {} already in store, throwing new update away", | 324 | log.warn("Intent ID {} already in store, throwing new update away", |
319 | intent.id()); | 325 | intent.id()); |
320 | - return null; | 326 | + return false; |
321 | } | 327 | } |
322 | 328 | ||
323 | private void notifyPeers(InternalIntentEvent event) { | 329 | private void notifyPeers(InternalIntentEvent event) { |
... | @@ -380,8 +386,8 @@ public class GossipIntentStore | ... | @@ -380,8 +386,8 @@ public class GossipIntentStore |
380 | try { | 386 | try { |
381 | switch (state) { | 387 | switch (state) { |
382 | case INSTALL_REQ: | 388 | case INSTALL_REQ: |
383 | - notifyDelegateIfNotNull(createIntentInternal(intent)); | 389 | + createIntentInternal(intent); |
384 | - break; | 390 | + // Fallthrough to setStateInternal for INSTALL_REQ |
385 | default: | 391 | default: |
386 | notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp)); | 392 | notifyDelegateIfNotNull(setStateInternal(intentId, state, timestamp)); |
387 | break; | 393 | break; | ... | ... |
-
Please register or login to post a comment