moving out OptionalCacheLoader
Change-Id: If929ed119df1a0282e311188a00776e971f78991
Showing
5 changed files
with
75 additions
and
42 deletions
1 | /** | 1 | /** |
2 | * Implementation of a distributed cluster node store using Hazelcast. | 2 | * Implementation of a distributed cluster node store using Hazelcast. |
3 | */ | 3 | */ |
4 | -package org.onlab.onos.store.cluster.impl; | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
4 | +package org.onlab.onos.store.cluster.impl; | ... | ... |
... | @@ -2,7 +2,6 @@ package org.onlab.onos.store.device.impl; | ... | @@ -2,7 +2,6 @@ package org.onlab.onos.store.device.impl; |
2 | 2 | ||
3 | import com.google.common.base.Optional; | 3 | import com.google.common.base.Optional; |
4 | import com.google.common.cache.CacheBuilder; | 4 | import com.google.common.cache.CacheBuilder; |
5 | -import com.google.common.cache.CacheLoader; | ||
6 | import com.google.common.cache.LoadingCache; | 5 | import com.google.common.cache.LoadingCache; |
7 | import com.google.common.collect.ImmutableList; | 6 | import com.google.common.collect.ImmutableList; |
8 | import com.google.common.collect.ImmutableSet; | 7 | import com.google.common.collect.ImmutableSet; |
... | @@ -13,6 +12,7 @@ import com.hazelcast.core.HazelcastInstance; | ... | @@ -13,6 +12,7 @@ import com.hazelcast.core.HazelcastInstance; |
13 | import com.hazelcast.core.IMap; | 12 | import com.hazelcast.core.IMap; |
14 | import com.hazelcast.core.ISet; | 13 | import com.hazelcast.core.ISet; |
15 | import com.hazelcast.core.MapEvent; | 14 | import com.hazelcast.core.MapEvent; |
15 | + | ||
16 | import org.apache.felix.scr.annotations.Activate; | 16 | import org.apache.felix.scr.annotations.Activate; |
17 | import org.apache.felix.scr.annotations.Component; | 17 | import org.apache.felix.scr.annotations.Component; |
18 | import org.apache.felix.scr.annotations.Deactivate; | 18 | import org.apache.felix.scr.annotations.Deactivate; |
... | @@ -33,6 +33,7 @@ import org.onlab.onos.net.device.PortDescription; | ... | @@ -33,6 +33,7 @@ import org.onlab.onos.net.device.PortDescription; |
33 | import org.onlab.onos.net.provider.ProviderId; | 33 | import org.onlab.onos.net.provider.ProviderId; |
34 | import org.onlab.onos.store.StoreService; | 34 | import org.onlab.onos.store.StoreService; |
35 | import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache; | 35 | import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache; |
36 | +import org.onlab.onos.store.impl.OptionalCacheLoader; | ||
36 | import org.slf4j.Logger; | 37 | import org.slf4j.Logger; |
37 | 38 | ||
38 | import java.util.ArrayList; | 39 | import java.util.ArrayList; |
... | @@ -78,7 +79,6 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -78,7 +79,6 @@ public class DistributedDeviceStore implements DeviceStore { |
78 | private IMap<byte[], byte[]> rawDevicePorts; | 79 | private IMap<byte[], byte[]> rawDevicePorts; |
79 | private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts; | 80 | private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts; |
80 | 81 | ||
81 | - // FIXME change to protected once we remove DistributedDeviceManagerTest. | ||
82 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 82 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
83 | protected StoreService storeService; | 83 | protected StoreService storeService; |
84 | 84 | ||
... | @@ -95,30 +95,36 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -95,30 +95,36 @@ public class DistributedDeviceStore implements DeviceStore { |
95 | 95 | ||
96 | // TODO decide on Map name scheme to avoid collision | 96 | // TODO decide on Map name scheme to avoid collision |
97 | rawDevices = theInstance.getMap("devices"); | 97 | rawDevices = theInstance.getMap("devices"); |
98 | + final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader | ||
99 | + = new OptionalCacheLoader<>(storeService, rawDevices); | ||
98 | devices = new AbsentInvalidatingLoadingCache<>( | 100 | devices = new AbsentInvalidatingLoadingCache<>( |
99 | CacheBuilder.newBuilder() | 101 | CacheBuilder.newBuilder() |
100 | - .build(new OptionalCacheLoader<DeviceId, DefaultDevice>(rawDevices))); | 102 | + .build(deviceLoader)); |
101 | // refresh/populate cache based on notification from other instance | 103 | // refresh/populate cache based on notification from other instance |
102 | rawDevices.addEntryListener( | 104 | rawDevices.addEntryListener( |
103 | new RemoteEventHandler<>(devices), | 105 | new RemoteEventHandler<>(devices), |
104 | includeValue); | 106 | includeValue); |
105 | 107 | ||
106 | rawRoles = theInstance.getMap("roles"); | 108 | rawRoles = theInstance.getMap("roles"); |
109 | + final OptionalCacheLoader<DeviceId, MastershipRole> rolesLoader | ||
110 | + = new OptionalCacheLoader<>(storeService, rawRoles); | ||
107 | roles = new AbsentInvalidatingLoadingCache<>( | 111 | roles = new AbsentInvalidatingLoadingCache<>( |
108 | CacheBuilder.newBuilder() | 112 | CacheBuilder.newBuilder() |
109 | - .build(new OptionalCacheLoader<DeviceId, MastershipRole>(rawRoles))); | 113 | + .build(rolesLoader)); |
110 | // refresh/populate cache based on notification from other instance | 114 | // refresh/populate cache based on notification from other instance |
111 | rawRoles.addEntryListener( | 115 | rawRoles.addEntryListener( |
112 | new RemoteEventHandler<>(roles), | 116 | new RemoteEventHandler<>(roles), |
113 | includeValue); | 117 | includeValue); |
114 | 118 | ||
115 | - // TODO cache avai | 119 | + // TODO cache availableDevices |
116 | availableDevices = theInstance.getSet("availableDevices"); | 120 | availableDevices = theInstance.getSet("availableDevices"); |
117 | 121 | ||
118 | rawDevicePorts = theInstance.getMap("devicePorts"); | 122 | rawDevicePorts = theInstance.getMap("devicePorts"); |
123 | + final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader | ||
124 | + = new OptionalCacheLoader<>(storeService, rawDevicePorts); | ||
119 | devicePorts = new AbsentInvalidatingLoadingCache<>( | 125 | devicePorts = new AbsentInvalidatingLoadingCache<>( |
120 | CacheBuilder.newBuilder() | 126 | CacheBuilder.newBuilder() |
121 | - .build(new OptionalCacheLoader<DeviceId, Map<PortNumber, Port>>(rawDevicePorts))); | 127 | + .build(devicePortLoader)); |
122 | // refresh/populate cache based on notification from other instance | 128 | // refresh/populate cache based on notification from other instance |
123 | rawDevicePorts.addEntryListener( | 129 | rawDevicePorts.addEntryListener( |
124 | new RemoteEventHandler<>(devicePorts), | 130 | new RemoteEventHandler<>(devicePorts), |
... | @@ -439,7 +445,7 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -439,7 +445,7 @@ public class DistributedDeviceStore implements DeviceStore { |
439 | 445 | ||
440 | @Override | 446 | @Override |
441 | public void entryRemoved(EntryEvent<byte[], byte[]> event) { | 447 | public void entryRemoved(EntryEvent<byte[], byte[]> event) { |
442 | - cache.invalidate(storeService.<DeviceId>deserialize(event.getKey())); | 448 | + cache.invalidate(storeService.<K>deserialize(event.getKey())); |
443 | } | 449 | } |
444 | 450 | ||
445 | @Override | 451 | @Override |
... | @@ -447,37 +453,4 @@ public class DistributedDeviceStore implements DeviceStore { | ... | @@ -447,37 +453,4 @@ public class DistributedDeviceStore implements DeviceStore { |
447 | entryUpdated(event); | 453 | entryUpdated(event); |
448 | } | 454 | } |
449 | } | 455 | } |
450 | - | ||
451 | - /** | ||
452 | - * CacheLoader to wrap Map value with Optional, | ||
453 | - * to handle negative hit on underlying IMap. | ||
454 | - * | ||
455 | - * @param <K> IMap key type after deserialization | ||
456 | - * @param <V> IMap value type after deserialization | ||
457 | - */ | ||
458 | - public final class OptionalCacheLoader<K, V> extends | ||
459 | - CacheLoader<K, Optional<V>> { | ||
460 | - | ||
461 | - private IMap<byte[], byte[]> rawMap; | ||
462 | - | ||
463 | - /** | ||
464 | - * Constructor. | ||
465 | - * | ||
466 | - * @param rawMap underlying IMap | ||
467 | - */ | ||
468 | - public OptionalCacheLoader(IMap<byte[], byte[]> rawMap) { | ||
469 | - this.rawMap = checkNotNull(rawMap); | ||
470 | - } | ||
471 | - | ||
472 | - @Override | ||
473 | - public Optional<V> load(K key) throws Exception { | ||
474 | - byte[] keyBytes = storeService.serialize(key); | ||
475 | - byte[] valBytes = rawMap.get(keyBytes); | ||
476 | - if (valBytes == null) { | ||
477 | - return Optional.absent(); | ||
478 | - } | ||
479 | - V dev = deserialize(valBytes); | ||
480 | - return Optional.of(dev); | ||
481 | - } | ||
482 | - } | ||
483 | } | 456 | } | ... | ... |
... | @@ -7,9 +7,24 @@ import com.google.common.base.Optional; | ... | @@ -7,9 +7,24 @@ import com.google.common.base.Optional; |
7 | import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache; | 7 | import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache; |
8 | import com.google.common.cache.LoadingCache; | 8 | import com.google.common.cache.LoadingCache; |
9 | 9 | ||
10 | +/** | ||
11 | + * Wrapper around LoadingCache to handle negative hit scenario. | ||
12 | + * <p> | ||
13 | + * When the LoadingCache returned Absent, | ||
14 | + * this implementation will invalidate the entry immediately to avoid | ||
15 | + * caching negative hits. | ||
16 | + * | ||
17 | + * @param <K> Cache key type | ||
18 | + * @param <V> Cache value type. (Optional{@literal <V>}) | ||
19 | + */ | ||
10 | public class AbsentInvalidatingLoadingCache<K, V> extends | 20 | public class AbsentInvalidatingLoadingCache<K, V> extends |
11 | SimpleForwardingLoadingCache<K, Optional<V>> { | 21 | SimpleForwardingLoadingCache<K, Optional<V>> { |
12 | 22 | ||
23 | + /** | ||
24 | + * Constructor. | ||
25 | + * | ||
26 | + * @param delegate actual {@link LoadingCache} to delegate loading. | ||
27 | + */ | ||
13 | public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) { | 28 | public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) { |
14 | super(delegate); | 29 | super(delegate); |
15 | } | 30 | } | ... | ... |
1 | +package org.onlab.onos.store.impl; | ||
2 | + | ||
3 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
4 | + | ||
5 | +import org.onlab.onos.store.StoreService; | ||
6 | + | ||
7 | +import com.google.common.base.Optional; | ||
8 | +import com.google.common.cache.CacheLoader; | ||
9 | +import com.hazelcast.core.IMap; | ||
10 | + | ||
11 | +/** | ||
12 | + * CacheLoader to wrap Map value with Optional, | ||
13 | + * to handle negative hit on underlying IMap. | ||
14 | + * | ||
15 | + * @param <K> IMap key type after deserialization | ||
16 | + * @param <V> IMap value type after deserialization | ||
17 | + */ | ||
18 | +public final class OptionalCacheLoader<K, V> extends | ||
19 | + CacheLoader<K, Optional<V>> { | ||
20 | + | ||
21 | + private final StoreService storeService; | ||
22 | + private IMap<byte[], byte[]> rawMap; | ||
23 | + | ||
24 | + /** | ||
25 | + * Constructor. | ||
26 | + * | ||
27 | + * @param storeService to use for serialization | ||
28 | + * @param rawMap underlying IMap | ||
29 | + */ | ||
30 | + public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) { | ||
31 | + this.storeService = checkNotNull(storeService); | ||
32 | + this.rawMap = checkNotNull(rawMap); | ||
33 | + } | ||
34 | + | ||
35 | + @Override | ||
36 | + public Optional<V> load(K key) throws Exception { | ||
37 | + byte[] keyBytes = storeService.serialize(key); | ||
38 | + byte[] valBytes = rawMap.get(keyBytes); | ||
39 | + if (valBytes == null) { | ||
40 | + return Optional.absent(); | ||
41 | + } | ||
42 | + V dev = storeService.deserialize(valBytes); | ||
43 | + return Optional.of(dev); | ||
44 | + } | ||
45 | +} |
1 | /** | 1 | /** |
2 | * Various Kryo serializers for use in distributed stores. | 2 | * Various Kryo serializers for use in distributed stores. |
3 | */ | 3 | */ |
4 | -package org.onlab.onos.store.serializers; | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
4 | +package org.onlab.onos.store.serializers; | ... | ... |
-
Please register or login to post a comment