Yuta HIGUCHI

initial DistributedDeviceStore

Change-Id: I8730f5c8f7706dafb245ee73d989e7a562d92187
...@@ -33,6 +33,15 @@ ...@@ -33,6 +33,15 @@
33 <groupId>org.apache.felix</groupId> 33 <groupId>org.apache.felix</groupId>
34 <artifactId>org.apache.felix.scr.annotations</artifactId> 34 <artifactId>org.apache.felix.scr.annotations</artifactId>
35 </dependency> 35 </dependency>
36 +
37 + <!-- TODO Consider removing store dependency.
38 + Currently required for DistributedDeviceManagerTest. -->
39 + <dependency>
40 + <groupId>org.onlab.onos</groupId>
41 + <artifactId>onos-core-store</artifactId>
42 + <version>${project.version}</version>
43 + <scope>test</scope>
44 + </dependency>
36 </dependencies> 45 </dependencies>
37 46
38 <build> 47 <build>
......
1 +package org.onlab.onos.net.device.impl;
2 +
3 +import org.junit.After;
4 +import org.junit.Before;
5 +import org.junit.Ignore;
6 +import org.junit.Test;
7 +import org.onlab.onos.event.Event;
8 +import org.onlab.onos.net.Device;
9 +import org.onlab.onos.net.DeviceId;
10 +import org.onlab.onos.net.MastershipRole;
11 +import org.onlab.onos.net.Port;
12 +import org.onlab.onos.net.PortNumber;
13 +import org.onlab.onos.net.device.DefaultDeviceDescription;
14 +import org.onlab.onos.net.device.DefaultPortDescription;
15 +import org.onlab.onos.net.device.DeviceAdminService;
16 +import org.onlab.onos.net.device.DeviceDescription;
17 +import org.onlab.onos.net.device.DeviceEvent;
18 +import org.onlab.onos.net.device.DeviceListener;
19 +import org.onlab.onos.net.device.DeviceProvider;
20 +import org.onlab.onos.net.device.DeviceProviderRegistry;
21 +import org.onlab.onos.net.device.DeviceProviderService;
22 +import org.onlab.onos.net.device.DeviceService;
23 +import org.onlab.onos.net.device.PortDescription;
24 +import org.onlab.onos.net.provider.AbstractProvider;
25 +import org.onlab.onos.net.provider.ProviderId;
26 +import org.onlab.onos.event.impl.TestEventDispatcher;
27 +import org.onlab.onos.store.device.impl.DistributedDeviceStore;
28 +
29 +import com.google.common.collect.Iterables;
30 +import com.hazelcast.config.Config;
31 +import com.hazelcast.core.Hazelcast;
32 +
33 +import java.util.ArrayList;
34 +import java.util.Iterator;
35 +import java.util.List;
36 +import java.util.UUID;
37 +
38 +import static org.junit.Assert.*;
39 +import static org.onlab.onos.net.Device.Type.SWITCH;
40 +import static org.onlab.onos.net.DeviceId.deviceId;
41 +import static org.onlab.onos.net.device.DeviceEvent.Type.*;
42 +
43 +// FIXME This test is painfully slow starting up Hazelcast on each test cases,
44 +// turning it off in repository for now.
45 +// FIXME DistributedDeviceStore should have it's own test cases.
46 +/**
47 + * Test codifying the device service & device provider service contracts.
48 + */
49 +@Ignore
50 +public class DistributedDeviceManagerTest {
51 +
52 + private static final ProviderId PID = new ProviderId("of", "foo");
53 + private static final DeviceId DID1 = deviceId("of:foo");
54 + private static final DeviceId DID2 = deviceId("of:bar");
55 + private static final String MFR = "whitebox";
56 + private static final String HW = "1.1.x";
57 + private static final String SW1 = "3.8.1";
58 + private static final String SW2 = "3.9.5";
59 + private static final String SN = "43311-12345";
60 +
61 + private static final PortNumber P1 = PortNumber.portNumber(1);
62 + private static final PortNumber P2 = PortNumber.portNumber(2);
63 + private static final PortNumber P3 = PortNumber.portNumber(3);
64 +
65 + private DeviceManager mgr;
66 +
67 + protected DeviceService service;
68 + protected DeviceAdminService admin;
69 + protected DeviceProviderRegistry registry;
70 + protected DeviceProviderService providerService;
71 + protected TestProvider provider;
72 + protected TestListener listener = new TestListener();
73 + private DistributedDeviceStore dstore;
74 +
75 + @Before
76 + public void setUp() {
77 + mgr = new DeviceManager();
78 + service = mgr;
79 + admin = mgr;
80 + registry = mgr;
81 + dstore = new DistributedDeviceStore();
82 + // FIXME should be reading the hazelcast.xml
83 + Config config = new Config();
84 + // avoid accidentally joining other cluster
85 + config.getGroupConfig().setName(UUID.randomUUID().toString());
86 + // quickly form single node cluster
87 + config.getNetworkConfig().getJoin().getMulticastConfig()
88 + .setMulticastTimeoutSeconds(0);
89 + dstore.theInstance = Hazelcast.newHazelcastInstance(config);
90 + dstore.activate();
91 + mgr.store = dstore;
92 + mgr.eventDispatcher = new TestEventDispatcher();
93 + mgr.activate();
94 +
95 + service.addListener(listener);
96 +
97 + provider = new TestProvider();
98 + providerService = registry.register(provider);
99 + assertTrue("provider should be registered",
100 + registry.getProviders().contains(provider.id()));
101 + }
102 +
103 + @After
104 + public void tearDown() {
105 + registry.unregister(provider);
106 + assertFalse("provider should not be registered",
107 + registry.getProviders().contains(provider.id()));
108 + service.removeListener(listener);
109 + mgr.deactivate();
110 +
111 + dstore.deactivate();
112 + dstore.theInstance.shutdown();
113 + }
114 +
115 + private void connectDevice(DeviceId deviceId, String swVersion) {
116 + DeviceDescription description =
117 + new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
118 + HW, swVersion, SN);
119 + providerService.deviceConnected(deviceId, description);
120 + assertNotNull("device should be found", service.getDevice(DID1));
121 + }
122 +
123 + @Test
124 + public void deviceConnected() {
125 + assertNull("device should not be found", service.getDevice(DID1));
126 + connectDevice(DID1, SW1);
127 + validateEvents(DEVICE_ADDED);
128 +
129 + assertEquals("only one device expected", 1, Iterables.size(service.getDevices()));
130 + Iterator<Device> it = service.getDevices().iterator();
131 + assertNotNull("one device expected", it.next());
132 + assertFalse("only one device expected", it.hasNext());
133 +
134 + assertEquals("incorrect device count", 1, service.getDeviceCount());
135 + assertTrue("device should be available", service.isAvailable(DID1));
136 + }
137 +
138 + @Test
139 + public void deviceDisconnected() {
140 + connectDevice(DID1, SW1);
141 + connectDevice(DID2, SW1);
142 + validateEvents(DEVICE_ADDED, DEVICE_ADDED);
143 + assertTrue("device should be available", service.isAvailable(DID1));
144 +
145 + // Disconnect
146 + providerService.deviceDisconnected(DID1);
147 + assertNotNull("device should not be found", service.getDevice(DID1));
148 + assertFalse("device should not be available", service.isAvailable(DID1));
149 + validateEvents(DEVICE_AVAILABILITY_CHANGED);
150 +
151 + // Reconnect
152 + connectDevice(DID1, SW1);
153 + validateEvents(DEVICE_AVAILABILITY_CHANGED);
154 +
155 + assertEquals("incorrect device count", 2, service.getDeviceCount());
156 + }
157 +
158 + @Test
159 + public void deviceUpdated() {
160 + connectDevice(DID1, SW1);
161 + validateEvents(DEVICE_ADDED);
162 +
163 + connectDevice(DID1, SW2);
164 + validateEvents(DEVICE_UPDATED);
165 + }
166 +
167 + @Test
168 + public void getRole() {
169 + connectDevice(DID1, SW1);
170 + assertEquals("incorrect role", MastershipRole.MASTER, service.getRole(DID1));
171 + }
172 +
173 + @Test
174 + public void setRole() throws InterruptedException {
175 + connectDevice(DID1, SW1);
176 + admin.setRole(DID1, MastershipRole.STANDBY);
177 + validateEvents(DEVICE_ADDED, DEVICE_MASTERSHIP_CHANGED);
178 + assertEquals("incorrect role", MastershipRole.STANDBY, service.getRole(DID1));
179 + assertEquals("incorrect device", DID1, provider.deviceReceived.id());
180 + assertEquals("incorrect role", MastershipRole.STANDBY, provider.roleReceived);
181 + }
182 +
183 + @Test
184 + public void updatePorts() {
185 + connectDevice(DID1, SW1);
186 + List<PortDescription> pds = new ArrayList<>();
187 + pds.add(new DefaultPortDescription(P1, true));
188 + pds.add(new DefaultPortDescription(P2, true));
189 + pds.add(new DefaultPortDescription(P3, true));
190 + providerService.updatePorts(DID1, pds);
191 + validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
192 + pds.clear();
193 +
194 + pds.add(new DefaultPortDescription(P1, false));
195 + pds.add(new DefaultPortDescription(P3, true));
196 + providerService.updatePorts(DID1, pds);
197 + validateEvents(PORT_UPDATED, PORT_REMOVED);
198 + }
199 +
200 + @Test
201 + public void updatePortStatus() {
202 + connectDevice(DID1, SW1);
203 + List<PortDescription> pds = new ArrayList<>();
204 + pds.add(new DefaultPortDescription(P1, true));
205 + pds.add(new DefaultPortDescription(P2, true));
206 + providerService.updatePorts(DID1, pds);
207 + validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
208 +
209 + providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
210 + validateEvents(PORT_UPDATED);
211 + providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
212 + assertTrue("no events expected", listener.events.isEmpty());
213 + }
214 +
215 + @Test
216 + public void getPorts() {
217 + connectDevice(DID1, SW1);
218 + List<PortDescription> pds = new ArrayList<>();
219 + pds.add(new DefaultPortDescription(P1, true));
220 + pds.add(new DefaultPortDescription(P2, true));
221 + providerService.updatePorts(DID1, pds);
222 + validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
223 + assertEquals("wrong port count", 2, service.getPorts(DID1).size());
224 +
225 + Port port = service.getPort(DID1, P1);
226 + assertEquals("incorrect port", P1, service.getPort(DID1, P1).number());
227 + assertEquals("incorrect state", true, service.getPort(DID1, P1).isEnabled());
228 + }
229 +
230 + @Test
231 + public void removeDevice() {
232 + connectDevice(DID1, SW1);
233 + connectDevice(DID2, SW2);
234 + assertEquals("incorrect device count", 2, service.getDeviceCount());
235 + admin.removeDevice(DID1);
236 + assertNull("device should not be found", service.getDevice(DID1));
237 + assertNotNull("device should be found", service.getDevice(DID2));
238 + assertEquals("incorrect device count", 1, service.getDeviceCount());
239 +
240 + }
241 +
242 + protected void validateEvents(Enum... types) {
243 + int i = 0;
244 + assertEquals("wrong events received", types.length, listener.events.size());
245 + for (Event event : listener.events) {
246 + assertEquals("incorrect event type", types[i], event.type());
247 + i++;
248 + }
249 + listener.events.clear();
250 + }
251 +
252 +
253 + private class TestProvider extends AbstractProvider implements DeviceProvider {
254 + private Device deviceReceived;
255 + private MastershipRole roleReceived;
256 +
257 + public TestProvider() {
258 + super(PID);
259 + }
260 +
261 + @Override
262 + public void triggerProbe(Device device) {
263 + }
264 +
265 + @Override
266 + public void roleChanged(Device device, MastershipRole newRole) {
267 + deviceReceived = device;
268 + roleReceived = newRole;
269 + }
270 + }
271 +
272 + private static class TestListener implements DeviceListener {
273 + final List<DeviceEvent> events = new ArrayList<>();
274 +
275 + @Override
276 + public void event(DeviceEvent event) {
277 + events.add(event);
278 + }
279 + }
280 +
281 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import java.util.concurrent.Callable;
4 +import java.util.concurrent.ExecutionException;
5 +
6 +import com.google.common.base.Optional;
7 +import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
8 +import com.google.common.cache.LoadingCache;
9 +
10 +public class AbsentInvalidatingLoadingCache<K, V> extends
11 + SimpleForwardingLoadingCache<K, Optional<V>> {
12 +
13 + public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
14 + super(delegate);
15 + }
16 +
17 + @Override
18 + public Optional<V> get(K key) throws ExecutionException {
19 + Optional<V> v = super.get(key);
20 + if (!v.isPresent()) {
21 + invalidate(key);
22 + }
23 + return v;
24 + }
25 +
26 + @Override
27 + public Optional<V> getUnchecked(K key) {
28 + Optional<V> v = super.getUnchecked(key);
29 + if (!v.isPresent()) {
30 + invalidate(key);
31 + }
32 + return v;
33 + }
34 +
35 + @Override
36 + public Optional<V> apply(K key) {
37 + return getUnchecked(key);
38 + }
39 +
40 + @Override
41 + public Optional<V> getIfPresent(Object key) {
42 + Optional<V> v = super.getIfPresent(key);
43 + if (!v.isPresent()) {
44 + invalidate(key);
45 + }
46 + return v;
47 + }
48 +
49 + @Override
50 + public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
51 + throws ExecutionException {
52 +
53 + Optional<V> v = super.get(key, valueLoader);
54 + if (!v.isPresent()) {
55 + invalidate(key);
56 + }
57 + return v;
58 + }
59 +
60 + // TODO should we be also checking getAll, etc.
61 +}
1 package org.onlab.onos.store.device.impl; 1 package org.onlab.onos.store.device.impl;
2 2
3 -import com.google.common.collect.ImmutableList; 3 +import static com.google.common.base.Preconditions.checkArgument;
4 +import static com.google.common.base.Preconditions.checkNotNull;
5 +import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED;
6 +import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED;
7 +import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_REMOVED;
8 +import static org.onlab.onos.net.device.DeviceEvent.Type.PORT_ADDED;
9 +import static org.onlab.onos.net.device.DeviceEvent.Type.PORT_REMOVED;
10 +import static org.onlab.onos.net.device.DeviceEvent.Type.PORT_UPDATED;
11 +import static org.slf4j.LoggerFactory.getLogger;
12 +
13 +import java.net.URI;
14 +import java.util.ArrayList;
15 +import java.util.Collections;
16 +import java.util.HashMap;
17 +import java.util.HashSet;
18 +import java.util.Iterator;
19 +import java.util.List;
20 +import java.util.Map;
21 +import java.util.Objects;
22 +import java.util.Set;
23 +
4 import org.apache.felix.scr.annotations.Activate; 24 import org.apache.felix.scr.annotations.Activate;
5 import org.apache.felix.scr.annotations.Component; 25 import org.apache.felix.scr.annotations.Component;
6 import org.apache.felix.scr.annotations.Deactivate; 26 import org.apache.felix.scr.annotations.Deactivate;
27 +import org.apache.felix.scr.annotations.Reference;
28 +import org.apache.felix.scr.annotations.ReferenceCardinality;
7 import org.apache.felix.scr.annotations.Service; 29 import org.apache.felix.scr.annotations.Service;
8 import org.onlab.onos.net.DefaultDevice; 30 import org.onlab.onos.net.DefaultDevice;
9 import org.onlab.onos.net.DefaultPort; 31 import org.onlab.onos.net.DefaultPort;
10 import org.onlab.onos.net.Device; 32 import org.onlab.onos.net.Device;
11 import org.onlab.onos.net.DeviceId; 33 import org.onlab.onos.net.DeviceId;
34 +import org.onlab.onos.net.Element;
12 import org.onlab.onos.net.MastershipRole; 35 import org.onlab.onos.net.MastershipRole;
13 import org.onlab.onos.net.Port; 36 import org.onlab.onos.net.Port;
14 import org.onlab.onos.net.PortNumber; 37 import org.onlab.onos.net.PortNumber;
...@@ -17,22 +40,25 @@ import org.onlab.onos.net.device.DeviceEvent; ...@@ -17,22 +40,25 @@ import org.onlab.onos.net.device.DeviceEvent;
17 import org.onlab.onos.net.device.DeviceStore; 40 import org.onlab.onos.net.device.DeviceStore;
18 import org.onlab.onos.net.device.PortDescription; 41 import org.onlab.onos.net.device.PortDescription;
19 import org.onlab.onos.net.provider.ProviderId; 42 import org.onlab.onos.net.provider.ProviderId;
43 +import org.onlab.util.KryoPool;
20 import org.slf4j.Logger; 44 import org.slf4j.Logger;
21 45
22 -import java.util.ArrayList; 46 +import com.google.common.base.Optional;
23 -import java.util.Collections; 47 +import com.google.common.cache.CacheBuilder;
24 -import java.util.HashMap; 48 +import com.google.common.cache.CacheLoader;
25 -import java.util.HashSet; 49 +import com.google.common.cache.LoadingCache;
26 -import java.util.Iterator; 50 +import com.google.common.collect.ImmutableList;
27 -import java.util.List; 51 +import com.google.common.collect.ImmutableSet;
28 -import java.util.Map; 52 +import com.google.common.collect.ImmutableSet.Builder;
29 -import java.util.Objects; 53 +import com.hazelcast.core.EntryAdapter;
30 -import java.util.Set; 54 +import com.hazelcast.core.EntryEvent;
31 -import java.util.concurrent.ConcurrentHashMap; 55 +import com.hazelcast.core.HazelcastInstance;
56 +import com.hazelcast.core.IMap;
57 +import com.hazelcast.core.ISet;
58 +import com.hazelcast.core.MapEvent;
59 +
60 +import de.javakaffee.kryoserializers.URISerializer;
32 61
33 -import static com.google.common.base.Preconditions.checkArgument;
34 -import static org.onlab.onos.net.device.DeviceEvent.Type.*;
35 -import static org.slf4j.LoggerFactory.getLogger;
36 62
37 /** 63 /**
38 * Manages inventory of infrastructure devices using Hazelcast-backed map. 64 * Manages inventory of infrastructure devices using Hazelcast-backed map.
...@@ -41,18 +67,167 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -41,18 +67,167 @@ import static org.slf4j.LoggerFactory.getLogger;
41 @Service 67 @Service
42 public class DistributedDeviceStore implements DeviceStore { 68 public class DistributedDeviceStore implements DeviceStore {
43 69
70 + /**
71 + * An IMap EntryListener, which reflects each remote event to cache.
72 + *
73 + * @param <K> IMap key type after deserialization
74 + * @param <V> IMap value type after deserialization
75 + */
76 + public static final class RemoteEventHandler<K, V> extends
77 + EntryAdapter<byte[], byte[]> {
78 +
79 + private LoadingCache<K, Optional<V>> cache;
80 +
81 + /**
82 + * Constructor.
83 + *
84 + * @param cache cache to update
85 + */
86 + public RemoteEventHandler(
87 + LoadingCache<K, Optional<V>> cache) {
88 + this.cache = checkNotNull(cache);
89 + }
90 +
91 + @Override
92 + public void mapCleared(MapEvent event) {
93 + cache.invalidateAll();
94 + }
95 +
96 + @Override
97 + public void entryUpdated(EntryEvent<byte[], byte[]> event) {
98 + cache.put(POOL.<K>deserialize(event.getKey()),
99 + Optional.of(POOL.<V>deserialize(
100 + event.getValue())));
101 + }
102 +
103 + @Override
104 + public void entryRemoved(EntryEvent<byte[], byte[]> event) {
105 + cache.invalidate(POOL.<DeviceId>deserialize(event.getKey()));
106 + }
107 +
108 + @Override
109 + public void entryAdded(EntryEvent<byte[], byte[]> event) {
110 + entryUpdated(event);
111 + }
112 + }
113 +
114 + /**
115 + * CacheLoader to wrap Map value with Optional,
116 + * to handle negative hit on underlying IMap.
117 + *
118 + * @param <K> IMap key type after deserialization
119 + * @param <V> IMap value type after deserialization
120 + */
121 + public static final class OptionalCacheLoader<K, V> extends
122 + CacheLoader<K, Optional<V>> {
123 +
124 + private IMap<byte[], byte[]> rawMap;
125 +
126 + /**
127 + * Constructor.
128 + *
129 + * @param rawMap underlying IMap
130 + */
131 + public OptionalCacheLoader(IMap<byte[], byte[]> rawMap) {
132 + this.rawMap = checkNotNull(rawMap);
133 + }
134 +
135 + @Override
136 + public Optional<V> load(K key) throws Exception {
137 + byte[] keyBytes = serialize(key);
138 + byte[] valBytes = rawMap.get(keyBytes);
139 + if (valBytes == null) {
140 + return Optional.absent();
141 + }
142 + V dev = deserialize(valBytes);
143 + return Optional.of(dev);
144 + }
145 + }
146 +
44 private final Logger log = getLogger(getClass()); 147 private final Logger log = getLogger(getClass());
45 148
46 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; 149 public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
47 150
48 - private final Map<DeviceId, DefaultDevice> devices = new ConcurrentHashMap<>(); 151 + // FIXME Slice out types used in common to separate pool/namespace.
49 - private final Map<DeviceId, MastershipRole> roles = new ConcurrentHashMap<>(); 152 + private static final KryoPool POOL = KryoPool.newBuilder()
50 - private final Set<DeviceId> availableDevices = new HashSet<>(); 153 + .register(URI.class, new URISerializer())
51 - private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = new HashMap<>(); 154 + .register(
155 + ArrayList.class,
156 +
157 + ProviderId.class,
158 + Device.Type.class,
159 +
160 + DeviceId.class,
161 + DefaultDevice.class,
162 + MastershipRole.class,
163 + HashMap.class,
164 + Port.class,
165 + Element.class
166 + )
167 + .register(PortNumber.class, new PortNumberSerializer())
168 + .register(DefaultPort.class, new DefaultPortSerializer())
169 + .build()
170 + .populate(10);
171 +
172 + // private IMap<DeviceId, DefaultDevice> cache;
173 + private IMap<byte[], byte[]> rawDevices;
174 + private LoadingCache<DeviceId, Optional<DefaultDevice>> devices;
175 +
176 + // private IMap<DeviceId, MastershipRole> roles;
177 + private IMap<byte[], byte[]> rawRoles;
178 + private LoadingCache<DeviceId, Optional<MastershipRole>> roles;
179 +
180 + // private ISet<DeviceId> availableDevices;
181 + private ISet<byte[]> availableDevices;
182 +
183 + // TODO DevicePorts is very inefficient consider restructuring.
184 + // private IMap<DeviceId, Map<PortNumber, Port>> devicePorts;
185 + private IMap<byte[], byte[]> rawDevicePorts;
186 + private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts;
187 +
188 + // FIXME change to protected once we remove DistributedDeviceManagerTest.
189 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
190 + /*protected*/public HazelcastInstance theInstance;
191 +
52 192
53 @Activate 193 @Activate
54 public void activate() { 194 public void activate() {
55 log.info("Started"); 195 log.info("Started");
196 +
197 + // IMap event handler needs value
198 + final boolean includeValue = true;
199 +
200 + // TODO decide on Map name scheme to avoid collision
201 + rawDevices = theInstance.getMap("devices");
202 + devices = new AbsentInvalidatingLoadingCache<DeviceId, DefaultDevice>(
203 + CacheBuilder.newBuilder()
204 + .build(new OptionalCacheLoader<DeviceId, DefaultDevice>(rawDevices)));
205 + // refresh/populate cache based on notification from other instance
206 + rawDevices.addEntryListener(
207 + new RemoteEventHandler<DeviceId, DefaultDevice>(devices),
208 + includeValue);
209 +
210 + rawRoles = theInstance.getMap("roles");
211 + roles = new AbsentInvalidatingLoadingCache<DeviceId, MastershipRole>(
212 + CacheBuilder.newBuilder()
213 + .build(new OptionalCacheLoader<DeviceId, MastershipRole>(rawRoles)));
214 + // refresh/populate cache based on notification from other instance
215 + rawRoles.addEntryListener(
216 + new RemoteEventHandler<DeviceId, MastershipRole>(roles),
217 + includeValue);
218 +
219 + // TODO cache avai
220 + availableDevices = theInstance.getSet("availableDevices");
221 +
222 + rawDevicePorts = theInstance.getMap("devicePorts");
223 + devicePorts = new AbsentInvalidatingLoadingCache<DeviceId, Map<PortNumber, Port>>(
224 + CacheBuilder.newBuilder()
225 + .build(new OptionalCacheLoader<DeviceId, Map<PortNumber, Port>>(rawDevicePorts)));
226 + // refresh/populate cache based on notification from other instance
227 + rawDevicePorts.addEntryListener(
228 + new RemoteEventHandler<DeviceId, Map<PortNumber, Port>>(devicePorts),
229 + includeValue);
230 +
56 } 231 }
57 232
58 @Deactivate 233 @Deactivate
...@@ -62,23 +237,42 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -62,23 +237,42 @@ public class DistributedDeviceStore implements DeviceStore {
62 237
63 @Override 238 @Override
64 public int getDeviceCount() { 239 public int getDeviceCount() {
65 - return devices.size(); 240 + // TODO IMap size or cache size?
241 + return rawDevices.size();
66 } 242 }
67 243
68 @Override 244 @Override
69 public Iterable<Device> getDevices() { 245 public Iterable<Device> getDevices() {
70 - return Collections.unmodifiableSet(new HashSet<Device>(devices.values())); 246 +// TODO Revisit if we ever need to do this.
247 +// log.info("{}:{}", rawMap.size(), cache.size());
248 +// if (rawMap.size() != cache.size()) {
249 +// for (Entry<byte[], byte[]> e : rawMap.entrySet()) {
250 +// final DeviceId key = deserialize(e.getKey());
251 +// final DefaultDevice val = deserialize(e.getValue());
252 +// cache.put(key, val);
253 +// }
254 +// }
255 +
256 + // TODO builder v.s. copyOf. Guava semms to be using copyOf?
257 + Builder<Device> builder = ImmutableSet.<Device>builder();
258 + for (Optional<DefaultDevice> e : devices.asMap().values()) {
259 + if (e.isPresent()) {
260 + builder.add(e.get());
261 + }
262 + }
263 + return builder.build();
71 } 264 }
72 265
73 @Override 266 @Override
74 public Device getDevice(DeviceId deviceId) { 267 public Device getDevice(DeviceId deviceId) {
75 - return devices.get(deviceId); 268 + // TODO revisit if ignoring exception is safe.
269 + return devices.getUnchecked(deviceId).orNull();
76 } 270 }
77 271
78 @Override 272 @Override
79 public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, 273 public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
80 DeviceDescription deviceDescription) { 274 DeviceDescription deviceDescription) {
81 - DefaultDevice device = devices.get(deviceId); 275 + DefaultDevice device = devices.getUnchecked(deviceId).orNull();
82 if (device == null) { 276 if (device == null) {
83 return createDevice(providerId, deviceId, deviceDescription); 277 return createDevice(providerId, deviceId, deviceDescription);
84 } 278 }
...@@ -92,12 +286,17 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -92,12 +286,17 @@ public class DistributedDeviceStore implements DeviceStore {
92 desc.manufacturer(), 286 desc.manufacturer(),
93 desc.hwVersion(), desc.swVersion(), 287 desc.hwVersion(), desc.swVersion(),
94 desc.serialNumber()); 288 desc.serialNumber());
289 +
95 synchronized (this) { 290 synchronized (this) {
96 - devices.put(deviceId, device); 291 + final byte[] deviceIdBytes = serialize(deviceId);
97 - availableDevices.add(deviceId); 292 + rawDevices.put(deviceIdBytes, serialize(device));
293 + devices.put(deviceId, Optional.of(device));
294 +
295 + availableDevices.add(deviceIdBytes);
98 296
99 // For now claim the device as a master automatically. 297 // For now claim the device as a master automatically.
100 - roles.put(deviceId, MastershipRole.MASTER); 298 + rawRoles.put(deviceIdBytes, serialize(MastershipRole.MASTER));
299 + roles.put(deviceId, Optional.of(MastershipRole.MASTER));
101 } 300 }
102 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null); 301 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
103 } 302 }
...@@ -108,6 +307,7 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -108,6 +307,7 @@ public class DistributedDeviceStore implements DeviceStore {
108 // We allow only certain attributes to trigger update 307 // We allow only certain attributes to trigger update
109 if (!Objects.equals(device.hwVersion(), desc.hwVersion()) || 308 if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
110 !Objects.equals(device.swVersion(), desc.swVersion())) { 309 !Objects.equals(device.swVersion(), desc.swVersion())) {
310 +
111 DefaultDevice updated = new DefaultDevice(providerId, device.id(), 311 DefaultDevice updated = new DefaultDevice(providerId, device.id(),
112 desc.type(), 312 desc.type(),
113 desc.manufacturer(), 313 desc.manufacturer(),
...@@ -115,15 +315,15 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -115,15 +315,15 @@ public class DistributedDeviceStore implements DeviceStore {
115 desc.swVersion(), 315 desc.swVersion(),
116 desc.serialNumber()); 316 desc.serialNumber());
117 synchronized (this) { 317 synchronized (this) {
118 - devices.put(device.id(), updated); 318 + devices.put(device.id(), Optional.of(updated));
119 - availableDevices.add(device.id()); 319 + availableDevices.add(serialize(device.id()));
120 } 320 }
121 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, device, null); 321 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, device, null);
122 } 322 }
123 323
124 // Otherwise merely attempt to change availability 324 // Otherwise merely attempt to change availability
125 synchronized (this) { 325 synchronized (this) {
126 - boolean added = availableDevices.add(device.id()); 326 + boolean added = availableDevices.add(serialize(device.id()));
127 return !added ? null : 327 return !added ? null :
128 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); 328 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
129 } 329 }
...@@ -132,8 +332,8 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -132,8 +332,8 @@ public class DistributedDeviceStore implements DeviceStore {
132 @Override 332 @Override
133 public DeviceEvent markOffline(DeviceId deviceId) { 333 public DeviceEvent markOffline(DeviceId deviceId) {
134 synchronized (this) { 334 synchronized (this) {
135 - Device device = devices.get(deviceId); 335 + Device device = devices.getUnchecked(deviceId).orNull();
136 - boolean removed = device != null && availableDevices.remove(deviceId); 336 + boolean removed = device != null && availableDevices.remove(serialize(deviceId));
137 return !removed ? null : 337 return !removed ? null :
138 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null); 338 new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
139 } 339 }
...@@ -144,7 +344,7 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -144,7 +344,7 @@ public class DistributedDeviceStore implements DeviceStore {
144 List<PortDescription> portDescriptions) { 344 List<PortDescription> portDescriptions) {
145 List<DeviceEvent> events = new ArrayList<>(); 345 List<DeviceEvent> events = new ArrayList<>();
146 synchronized (this) { 346 synchronized (this) {
147 - Device device = devices.get(deviceId); 347 + Device device = devices.getUnchecked(deviceId).orNull();
148 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 348 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
149 Map<PortNumber, Port> ports = getPortMap(deviceId); 349 Map<PortNumber, Port> ports = getPortMap(deviceId);
150 350
...@@ -158,6 +358,8 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -158,6 +358,8 @@ public class DistributedDeviceStore implements DeviceStore {
158 processed.add(portDescription.portNumber()); 358 processed.add(portDescription.portNumber());
159 } 359 }
160 360
361 + updatePortMap(deviceId, ports);
362 +
161 events.addAll(pruneOldPorts(device, ports, processed)); 363 events.addAll(pruneOldPorts(device, ports, processed));
162 } 364 }
163 return events; 365 return events;
...@@ -165,16 +367,19 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -165,16 +367,19 @@ public class DistributedDeviceStore implements DeviceStore {
165 367
166 // Creates a new port based on the port description adds it to the map and 368 // Creates a new port based on the port description adds it to the map and
167 // Returns corresponding event. 369 // Returns corresponding event.
370 + //@GuardedBy("this")
168 private DeviceEvent createPort(Device device, PortDescription portDescription, 371 private DeviceEvent createPort(Device device, PortDescription portDescription,
169 Map<PortNumber, Port> ports) { 372 Map<PortNumber, Port> ports) {
170 DefaultPort port = new DefaultPort(device, portDescription.portNumber(), 373 DefaultPort port = new DefaultPort(device, portDescription.portNumber(),
171 portDescription.isEnabled()); 374 portDescription.isEnabled());
172 ports.put(port.number(), port); 375 ports.put(port.number(), port);
376 + updatePortMap(device.id(), ports);
173 return new DeviceEvent(PORT_ADDED, device, port); 377 return new DeviceEvent(PORT_ADDED, device, port);
174 } 378 }
175 379
176 - // CHecks if the specified port requires update and if so, it replaces the 380 + // Checks if the specified port requires update and if so, it replaces the
177 // existing entry in the map and returns corresponding event. 381 // existing entry in the map and returns corresponding event.
382 + //@GuardedBy("this")
178 private DeviceEvent updatePort(Device device, Port port, 383 private DeviceEvent updatePort(Device device, Port port,
179 PortDescription portDescription, 384 PortDescription portDescription,
180 Map<PortNumber, Port> ports) { 385 Map<PortNumber, Port> ports) {
...@@ -183,6 +388,7 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -183,6 +388,7 @@ public class DistributedDeviceStore implements DeviceStore {
183 new DefaultPort(device, portDescription.portNumber(), 388 new DefaultPort(device, portDescription.portNumber(),
184 portDescription.isEnabled()); 389 portDescription.isEnabled());
185 ports.put(port.number(), updatedPort); 390 ports.put(port.number(), updatedPort);
391 + updatePortMap(device.id(), ports);
186 return new DeviceEvent(PORT_UPDATED, device, port); 392 return new DeviceEvent(PORT_UPDATED, device, port);
187 } 393 }
188 return null; 394 return null;
...@@ -190,6 +396,7 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -190,6 +396,7 @@ public class DistributedDeviceStore implements DeviceStore {
190 396
191 // Prunes the specified list of ports based on which ports are in the 397 // Prunes the specified list of ports based on which ports are in the
192 // processed list and returns list of corresponding events. 398 // processed list and returns list of corresponding events.
399 + //@GuardedBy("this")
193 private List<DeviceEvent> pruneOldPorts(Device device, 400 private List<DeviceEvent> pruneOldPorts(Device device,
194 Map<PortNumber, Port> ports, 401 Map<PortNumber, Port> ports,
195 Set<PortNumber> processed) { 402 Set<PortNumber> processed) {
...@@ -203,25 +410,38 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -203,25 +410,38 @@ public class DistributedDeviceStore implements DeviceStore {
203 iterator.remove(); 410 iterator.remove();
204 } 411 }
205 } 412 }
413 + if (!events.isEmpty()) {
414 + updatePortMap(device.id(), ports);
415 + }
206 return events; 416 return events;
207 } 417 }
208 418
209 // Gets the map of ports for the specified device; if one does not already 419 // Gets the map of ports for the specified device; if one does not already
210 // exist, it creates and registers a new one. 420 // exist, it creates and registers a new one.
421 + // WARN: returned value is a copy, changes made to the Map
422 + // needs to be written back using updatePortMap
423 + //@GuardedBy("this")
211 private Map<PortNumber, Port> getPortMap(DeviceId deviceId) { 424 private Map<PortNumber, Port> getPortMap(DeviceId deviceId) {
212 - Map<PortNumber, Port> ports = devicePorts.get(deviceId); 425 + Map<PortNumber, Port> ports = devicePorts.getUnchecked(deviceId).orNull();
213 if (ports == null) { 426 if (ports == null) {
214 ports = new HashMap<>(); 427 ports = new HashMap<>();
215 - devicePorts.put(deviceId, ports); 428 + // this probably is waste of time in most cases.
429 + updatePortMap(deviceId, ports);
216 } 430 }
217 return ports; 431 return ports;
218 } 432 }
219 433
434 + //@GuardedBy("this")
435 + private void updatePortMap(DeviceId deviceId, Map<PortNumber, Port> ports) {
436 + rawDevicePorts.put(serialize(deviceId), serialize(ports));
437 + devicePorts.put(deviceId, Optional.of(ports));
438 + }
439 +
220 @Override 440 @Override
221 public DeviceEvent updatePortStatus(DeviceId deviceId, 441 public DeviceEvent updatePortStatus(DeviceId deviceId,
222 PortDescription portDescription) { 442 PortDescription portDescription) {
223 synchronized (this) { 443 synchronized (this) {
224 - Device device = devices.get(deviceId); 444 + Device device = devices.getUnchecked(deviceId).orNull();
225 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 445 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
226 Map<PortNumber, Port> ports = getPortMap(deviceId); 446 Map<PortNumber, Port> ports = getPortMap(deviceId);
227 Port port = ports.get(portDescription.portNumber()); 447 Port port = ports.get(portDescription.portNumber());
...@@ -231,24 +451,24 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -231,24 +451,24 @@ public class DistributedDeviceStore implements DeviceStore {
231 451
232 @Override 452 @Override
233 public List<Port> getPorts(DeviceId deviceId) { 453 public List<Port> getPorts(DeviceId deviceId) {
234 - Map<PortNumber, Port> ports = devicePorts.get(deviceId); 454 + Map<PortNumber, Port> ports = devicePorts.getUnchecked(deviceId).orNull();
235 - return ports == null ? new ArrayList<Port>() : ImmutableList.copyOf(ports.values()); 455 + return ports == null ? Collections.<Port>emptyList() : ImmutableList.copyOf(ports.values());
236 } 456 }
237 457
238 @Override 458 @Override
239 public Port getPort(DeviceId deviceId, PortNumber portNumber) { 459 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
240 - Map<PortNumber, Port> ports = devicePorts.get(deviceId); 460 + Map<PortNumber, Port> ports = devicePorts.getUnchecked(deviceId).orNull();
241 return ports == null ? null : ports.get(portNumber); 461 return ports == null ? null : ports.get(portNumber);
242 } 462 }
243 463
244 @Override 464 @Override
245 public boolean isAvailable(DeviceId deviceId) { 465 public boolean isAvailable(DeviceId deviceId) {
246 - return availableDevices.contains(deviceId); 466 + return availableDevices.contains(serialize(deviceId));
247 } 467 }
248 468
249 @Override 469 @Override
250 public MastershipRole getRole(DeviceId deviceId) { 470 public MastershipRole getRole(DeviceId deviceId) {
251 - MastershipRole role = roles.get(deviceId); 471 + MastershipRole role = roles.getUnchecked(deviceId).orNull();
252 return role != null ? role : MastershipRole.NONE; 472 return role != null ? role : MastershipRole.NONE;
253 } 473 }
254 474
...@@ -257,7 +477,9 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -257,7 +477,9 @@ public class DistributedDeviceStore implements DeviceStore {
257 synchronized (this) { 477 synchronized (this) {
258 Device device = getDevice(deviceId); 478 Device device = getDevice(deviceId);
259 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 479 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
260 - MastershipRole oldRole = roles.put(deviceId, role); 480 + MastershipRole oldRole = deserialize(
481 + rawRoles.put(serialize(deviceId), serialize(role)));
482 + roles.put(deviceId, Optional.of(role));
261 return oldRole == role ? null : 483 return oldRole == role ? null :
262 new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device, null); 484 new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device, null);
263 } 485 }
...@@ -266,10 +488,29 @@ public class DistributedDeviceStore implements DeviceStore { ...@@ -266,10 +488,29 @@ public class DistributedDeviceStore implements DeviceStore {
266 @Override 488 @Override
267 public DeviceEvent removeDevice(DeviceId deviceId) { 489 public DeviceEvent removeDevice(DeviceId deviceId) {
268 synchronized (this) { 490 synchronized (this) {
269 - roles.remove(deviceId); 491 + byte[] deviceIdBytes = serialize(deviceId);
270 - Device device = devices.remove(deviceId); 492 + rawRoles.remove(deviceIdBytes);
493 + roles.invalidate(deviceId);
494 +
495 + // TODO conditional remove?
496 + Device device = deserialize(rawDevices.remove(deviceIdBytes));
497 + devices.invalidate(deviceId);
271 return device == null ? null : 498 return device == null ? null :
272 new DeviceEvent(DEVICE_REMOVED, device, null); 499 new DeviceEvent(DEVICE_REMOVED, device, null);
273 } 500 }
274 } 501 }
502 +
503 + // TODO cache serialized DeviceID if we suffer from serialization cost
504 +
505 + private static byte[] serialize(final Object obj) {
506 + return POOL.serialize(obj);
507 + }
508 +
509 + private static <T> T deserialize(final byte[] bytes) {
510 + if (bytes == null) {
511 + return null;
512 + }
513 + return POOL.deserialize(bytes);
514 + }
515 +
275 } 516 }
......