Yuta HIGUCHI

implemented GossipDeviceStore with multi-provider, annotation support

Change-Id: I1953bdc37b28af79703ebcfc9201a71a2af49ab2
...@@ -2,7 +2,15 @@ package org.onlab.onos.store; ...@@ -2,7 +2,15 @@ package org.onlab.onos.store;
2 2
3 /** 3 /**
4 * Opaque version structure. 4 * Opaque version structure.
5 + * <p>
6 + * Classes implementing this interface must also implement
7 + * {@link #hashCode()} and {@link #equals(Object)}.
5 */ 8 */
6 public interface Timestamp extends Comparable<Timestamp> { 9 public interface Timestamp extends Comparable<Timestamp> {
7 10
11 + @Override
12 + public abstract int hashCode();
13 +
14 + @Override
15 + public abstract boolean equals(Object obj);
8 } 16 }
......
1 +package org.onlab.onos.store.common.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import java.util.Objects;
6 +
7 +import org.onlab.onos.store.Timestamp;
8 +
9 +/**
10 + * Wrapper class to store Timestamped value.
11 + * @param <T>
12 + */
13 +public final class Timestamped<T> {
14 +
15 + private final Timestamp timestamp;
16 + private final T value;
17 +
18 + /**
19 + * Creates a time stamped value.
20 + *
21 + * @param value to be timestamp
22 + * @param timestamp the timestamp
23 + */
24 + public Timestamped(T value, Timestamp timestamp) {
25 + this.value = checkNotNull(value);
26 + this.timestamp = checkNotNull(timestamp);
27 + }
28 +
29 + /**
30 + * Returns the value.
31 + * @return value
32 + */
33 + public T value() {
34 + return value;
35 + }
36 +
37 + /**
38 + * Returns the time stamp.
39 + * @return time stamp
40 + */
41 + public Timestamp timestamp() {
42 + return timestamp;
43 + }
44 +
45 + /**
46 + * Tests if this timestamped value is newer than the other.
47 + *
48 + * @param other timestamped value
49 + * @return true if this instance is newer.
50 + */
51 + public boolean isNewer(Timestamped<T> other) {
52 + return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0;
53 + }
54 +
55 + @Override
56 + public int hashCode() {
57 + return timestamp.hashCode();
58 + }
59 +
60 + @Override
61 + public boolean equals(Object obj) {
62 + if (this == obj) {
63 + return true;
64 + }
65 + if (!(obj instanceof Timestamped)) {
66 + return false;
67 + }
68 + @SuppressWarnings("unchecked")
69 + Timestamped<T> that = (Timestamped<T>) obj;
70 + return Objects.equals(this.timestamp, that.timestamp);
71 + }
72 +
73 + // Default constructor for serialization
74 + @Deprecated
75 + protected Timestamped() {
76 + this.value = null;
77 + this.timestamp = null;
78 + }
79 +}
1 +/**
2 + * Common abstractions and facilities for implementing distributed store
3 + * using gossip protocol.
4 + */
5 +package org.onlab.onos.store.common.impl;
...@@ -17,9 +17,12 @@ import org.onlab.onos.store.Timestamp; ...@@ -17,9 +17,12 @@ import org.onlab.onos.store.Timestamp;
17 import org.onlab.onos.store.impl.OnosTimestamp; 17 import org.onlab.onos.store.impl.OnosTimestamp;
18 import org.slf4j.Logger; 18 import org.slf4j.Logger;
19 19
20 +/**
21 + * Clock service to issue Timestamp based on Device Mastership.
22 + */
20 @Component(immediate = true) 23 @Component(immediate = true)
21 @Service 24 @Service
22 -public class OnosClockService implements ClockService { 25 +public class DeviceClockManager implements ClockService {
23 26
24 private final Logger log = getLogger(getClass()); 27 private final Logger log = getLogger(getClass());
25 28
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import com.google.common.collect.FluentIterable;
4 +import com.google.common.collect.ImmutableList;
5 +
6 +import org.apache.commons.lang3.concurrent.ConcurrentException;
7 +import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
8 +import org.apache.felix.scr.annotations.Activate;
9 +import org.apache.felix.scr.annotations.Component;
10 +import org.apache.felix.scr.annotations.Deactivate;
11 +import org.apache.felix.scr.annotations.Reference;
12 +import org.apache.felix.scr.annotations.ReferenceCardinality;
13 +import org.apache.felix.scr.annotations.Service;
14 +import org.onlab.onos.net.Annotations;
15 +import org.onlab.onos.net.DefaultAnnotations;
16 +import org.onlab.onos.net.DefaultDevice;
17 +import org.onlab.onos.net.DefaultPort;
18 +import org.onlab.onos.net.Device;
19 +import org.onlab.onos.net.Device.Type;
20 +import org.onlab.onos.net.DeviceId;
21 +import org.onlab.onos.net.Port;
22 +import org.onlab.onos.net.PortNumber;
23 +import org.onlab.onos.net.SparseAnnotations;
24 +import org.onlab.onos.net.device.DefaultDeviceDescription;
25 +import org.onlab.onos.net.device.DefaultPortDescription;
26 +import org.onlab.onos.net.device.DeviceDescription;
27 +import org.onlab.onos.net.device.DeviceEvent;
28 +import org.onlab.onos.net.device.DeviceStore;
29 +import org.onlab.onos.net.device.DeviceStoreDelegate;
30 +import org.onlab.onos.net.device.PortDescription;
31 +import org.onlab.onos.net.provider.ProviderId;
32 +import org.onlab.onos.store.AbstractStore;
33 +import org.onlab.onos.store.ClockService;
34 +import org.onlab.onos.store.Timestamp;
35 +import org.onlab.onos.store.common.impl.Timestamped;
36 +import org.slf4j.Logger;
37 +
38 +import java.util.ArrayList;
39 +import java.util.Collections;
40 +import java.util.HashSet;
41 +import java.util.Iterator;
42 +import java.util.List;
43 +import java.util.Map;
44 +import java.util.Map.Entry;
45 +import java.util.Objects;
46 +import java.util.Set;
47 +import java.util.concurrent.ConcurrentHashMap;
48 +import java.util.concurrent.ConcurrentMap;
49 +import java.util.concurrent.atomic.AtomicReference;
50 +
51 +import static com.google.common.base.Preconditions.checkArgument;
52 +import static com.google.common.base.Preconditions.checkNotNull;
53 +import static com.google.common.base.Predicates.notNull;
54 +import static org.onlab.onos.net.device.DeviceEvent.Type.*;
55 +import static org.slf4j.LoggerFactory.getLogger;
56 +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
57 +import static org.onlab.onos.net.DefaultAnnotations.merge;
58 +import static com.google.common.base.Verify.verify;
59 +
60 +// TODO: implement remove event handling and call *Internal
61 +/**
62 + * Manages inventory of infrastructure devices using gossip protocol to distribute
63 + * information.
64 + */
65 +@Component(immediate = true)
66 +@Service
67 +public class GossipDeviceStore
68 + extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
69 + implements DeviceStore {
70 +
71 + private final Logger log = getLogger(getClass());
72 +
73 + public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
74 +
75 + // TODO: Check if inner Map can be replaced with plain Map
76 + // innerMap is used to lock a Device, thus instance should never be replaced.
77 + // collection of Description given from various providers
78 + private final ConcurrentMap<DeviceId,
79 + ConcurrentMap<ProviderId, DeviceDescriptions>>
80 + deviceDescs = new ConcurrentHashMap<>();
81 +
82 + // cache of Device and Ports generated by compositing descriptions from providers
83 + private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
84 + private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
85 +
86 + // available(=UP) devices
87 + private final Set<DeviceId> availableDevices = new HashSet<>();
88 +
89 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 + protected ClockService clockService;
91 +
92 + @Activate
93 + public void activate() {
94 + log.info("Started");
95 + }
96 +
97 + @Deactivate
98 + public void deactivate() {
99 + deviceDescs.clear();
100 + devices.clear();
101 + devicePorts.clear();
102 + availableDevices.clear();
103 + log.info("Stopped");
104 + }
105 +
106 + @Override
107 + public int getDeviceCount() {
108 + return devices.size();
109 + }
110 +
111 + @Override
112 + public Iterable<Device> getDevices() {
113 + return Collections.unmodifiableCollection(devices.values());
114 + }
115 +
116 + @Override
117 + public Device getDevice(DeviceId deviceId) {
118 + return devices.get(deviceId);
119 + }
120 +
121 + @Override
122 + public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
123 + DeviceDescription deviceDescription) {
124 + Timestamp newTimestamp = clockService.getTimestamp(deviceId);
125 + final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
126 + DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
127 + if (event != null) {
128 + // FIXME: broadcast deltaDesc, UP
129 + log.debug("broadcast deltaDesc");
130 + }
131 + return event;
132 + }
133 +
134 + private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, DeviceId deviceId,
135 + Timestamped<DeviceDescription> deltaDesc) {
136 +
137 + // Collection of DeviceDescriptions for a Device
138 + ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
139 + = createIfAbsentUnchecked(deviceDescs, deviceId,
140 + new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
141 +
142 +
143 + DeviceDescriptions descs
144 + = createIfAbsentUnchecked(providerDescs, providerId,
145 + new InitDeviceDescs(deltaDesc));
146 +
147 + // update description
148 + synchronized (providerDescs) {
149 + // locking per device
150 +
151 + final Device oldDevice = devices.get(deviceId);
152 + final Device newDevice;
153 +
154 + if (deltaDesc == descs.getDeviceDesc() ||
155 + deltaDesc.isNewer(descs.getDeviceDesc())) {
156 + // on new device or valid update
157 + descs.putDeviceDesc(deltaDesc);
158 + newDevice = composeDevice(deviceId, providerDescs);
159 + } else {
160 + // outdated event, ignored.
161 + return null;
162 + }
163 + if (oldDevice == null) {
164 + // ADD
165 + return createDevice(providerId, newDevice);
166 + } else {
167 + // UPDATE or ignore (no change or stale)
168 + return updateDevice(providerId, oldDevice, newDevice);
169 + }
170 + }
171 + }
172 +
173 + // Creates the device and returns the appropriate event if necessary.
174 + // Guarded by deviceDescs value (=locking Device)
175 + private DeviceEvent createDevice(ProviderId providerId,
176 + Device newDevice) {
177 +
178 + // update composed device cache
179 + Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
180 + verify(oldDevice == null,
181 + "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
182 + providerId, oldDevice, newDevice);
183 +
184 + if (!providerId.isAncillary()) {
185 + availableDevices.add(newDevice.id());
186 + }
187 +
188 + return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
189 + }
190 +
191 + // Updates the device and returns the appropriate event if necessary.
192 + // Guarded by deviceDescs value (=locking Device)
193 + private DeviceEvent updateDevice(ProviderId providerId,
194 + Device oldDevice, Device newDevice) {
195 +
196 + // We allow only certain attributes to trigger update
197 + if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
198 + !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
199 + !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
200 +
201 + boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
202 + if (!replaced) {
203 + verify(replaced,
204 + "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
205 + providerId, oldDevice, devices.get(newDevice.id())
206 + , newDevice);
207 + }
208 + if (!providerId.isAncillary()) {
209 + availableDevices.add(newDevice.id());
210 + }
211 + return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
212 + }
213 +
214 + // Otherwise merely attempt to change availability if primary provider
215 + if (!providerId.isAncillary()) {
216 + boolean added = availableDevices.add(newDevice.id());
217 + return !added ? null :
218 + new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
219 + }
220 + return null;
221 + }
222 +
223 + @Override
224 + public DeviceEvent markOffline(DeviceId deviceId) {
225 + ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
226 + = createIfAbsentUnchecked(deviceDescs, deviceId,
227 + new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
228 +
229 + // locking device
230 + synchronized (providerDescs) {
231 + Device device = devices.get(deviceId);
232 + if (device == null) {
233 + return null;
234 + }
235 + boolean removed = availableDevices.remove(deviceId);
236 + if (removed) {
237 + // TODO: broadcast ... DOWN only?
238 + return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
239 +
240 + }
241 + return null;
242 + }
243 + }
244 +
245 + @Override
246 + public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
247 + List<PortDescription> portDescriptions) {
248 + Timestamp newTimestamp = clockService.getTimestamp(deviceId);
249 +
250 + List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
251 + for (PortDescription e : portDescriptions) {
252 + deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
253 + }
254 +
255 + List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, deltaDescs);
256 + if (!events.isEmpty()) {
257 + // FIXME: broadcast deltaDesc, UP
258 + log.debug("broadcast deltaDesc");
259 + }
260 + return events;
261 +
262 + }
263 +
264 + private List<DeviceEvent> updatePortsInternal(ProviderId providerId, DeviceId deviceId,
265 + List<Timestamped<PortDescription>> deltaDescs) {
266 +
267 + Device device = devices.get(deviceId);
268 + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
269 +
270 + ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
271 + checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
272 +
273 + DeviceDescriptions descs = descsMap.get(providerId);
274 + // every provider must provide DeviceDescription.
275 + checkArgument(descs != null,
276 + "Device description for Device ID %s from Provider %s was not found",
277 + deviceId, providerId);
278 +
279 + List<DeviceEvent> events = new ArrayList<>();
280 + synchronized (descsMap) {
281 + Map<PortNumber, Port> ports = getPortMap(deviceId);
282 +
283 + // Add new ports
284 + Set<PortNumber> processed = new HashSet<>();
285 + for (Timestamped<PortDescription> deltaDesc : deltaDescs) {
286 + final PortNumber number = deltaDesc.value().portNumber();
287 + final Port oldPort = ports.get(number);
288 + final Port newPort;
289 +
290 + final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
291 + if (existingPortDesc == null ||
292 + deltaDesc == existingPortDesc ||
293 + deltaDesc.isNewer(existingPortDesc)) {
294 + // on new port or valid update
295 + // update description
296 + descs.putPortDesc(deltaDesc);
297 + newPort = composePort(device, number, descsMap);
298 + } else {
299 + // outdated event, ignored.
300 + continue;
301 + }
302 +
303 + events.add(oldPort == null ?
304 + createPort(device, newPort, ports) :
305 + updatePort(device, oldPort, newPort, ports));
306 + processed.add(number);
307 + }
308 +
309 + events.addAll(pruneOldPorts(device, ports, processed));
310 + }
311 + return FluentIterable.from(events).filter(notNull()).toList();
312 + }
313 +
314 + // Creates a new port based on the port description adds it to the map and
315 + // Returns corresponding event.
316 + // Guarded by deviceDescs value (=locking Device)
317 + private DeviceEvent createPort(Device device, Port newPort,
318 + Map<PortNumber, Port> ports) {
319 + ports.put(newPort.number(), newPort);
320 + return new DeviceEvent(PORT_ADDED, device, newPort);
321 + }
322 +
323 + // Checks if the specified port requires update and if so, it replaces the
324 + // existing entry in the map and returns corresponding event.
325 + // Guarded by deviceDescs value (=locking Device)
326 + private DeviceEvent updatePort(Device device, Port oldPort,
327 + Port newPort,
328 + Map<PortNumber, Port> ports) {
329 + if (oldPort.isEnabled() != newPort.isEnabled() ||
330 + !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
331 +
332 + ports.put(oldPort.number(), newPort);
333 + return new DeviceEvent(PORT_UPDATED, device, newPort);
334 + }
335 + return null;
336 + }
337 +
338 + // Prunes the specified list of ports based on which ports are in the
339 + // processed list and returns list of corresponding events.
340 + // Guarded by deviceDescs value (=locking Device)
341 + private List<DeviceEvent> pruneOldPorts(Device device,
342 + Map<PortNumber, Port> ports,
343 + Set<PortNumber> processed) {
344 + List<DeviceEvent> events = new ArrayList<>();
345 + Iterator<PortNumber> iterator = ports.keySet().iterator();
346 + while (iterator.hasNext()) {
347 + PortNumber portNumber = iterator.next();
348 + if (!processed.contains(portNumber)) {
349 + events.add(new DeviceEvent(PORT_REMOVED, device,
350 + ports.get(portNumber)));
351 + iterator.remove();
352 + }
353 + }
354 + return events;
355 + }
356 +
357 + // Gets the map of ports for the specified device; if one does not already
358 + // exist, it creates and registers a new one.
359 + private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
360 + return createIfAbsentUnchecked(devicePorts, deviceId,
361 + new InitConcurrentHashMap<PortNumber, Port>());
362 + }
363 +
364 + @Override
365 + public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
366 + PortDescription portDescription) {
367 + Timestamp newTimestamp = clockService.getTimestamp(deviceId);
368 + final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
369 + DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
370 + if (event != null) {
371 + // FIXME: broadcast deltaDesc
372 + log.debug("broadcast deltaDesc");
373 + }
374 + return event;
375 + }
376 +
377 + private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
378 + Timestamped<PortDescription> deltaDesc) {
379 +
380 + Device device = devices.get(deviceId);
381 + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
382 +
383 + ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
384 + checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
385 +
386 + DeviceDescriptions descs = descsMap.get(providerId);
387 + // assuming all providers must to give DeviceDescription
388 + checkArgument(descs != null,
389 + "Device description for Device ID %s from Provider %s was not found",
390 + deviceId, providerId);
391 +
392 + synchronized (descsMap) {
393 + ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
394 + final PortNumber number = deltaDesc.value().portNumber();
395 + final Port oldPort = ports.get(number);
396 + final Port newPort;
397 +
398 + final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
399 + if (existingPortDesc == null ||
400 + deltaDesc == existingPortDesc ||
401 + deltaDesc.isNewer(existingPortDesc)) {
402 + // on new port or valid update
403 + // update description
404 + descs.putPortDesc(deltaDesc);
405 + newPort = composePort(device, number, descsMap);
406 + } else {
407 + // outdated event, ignored.
408 + return null;
409 + }
410 +
411 + if (oldPort == null) {
412 + return createPort(device, newPort, ports);
413 + } else {
414 + return updatePort(device, oldPort, newPort, ports);
415 + }
416 + }
417 + }
418 +
419 + @Override
420 + public List<Port> getPorts(DeviceId deviceId) {
421 + Map<PortNumber, Port> ports = devicePorts.get(deviceId);
422 + if (ports == null) {
423 + return Collections.emptyList();
424 + }
425 + return ImmutableList.copyOf(ports.values());
426 + }
427 +
428 + @Override
429 + public Port getPort(DeviceId deviceId, PortNumber portNumber) {
430 + Map<PortNumber, Port> ports = devicePorts.get(deviceId);
431 + return ports == null ? null : ports.get(portNumber);
432 + }
433 +
434 + @Override
435 + public boolean isAvailable(DeviceId deviceId) {
436 + return availableDevices.contains(deviceId);
437 + }
438 +
439 + @Override
440 + public DeviceEvent removeDevice(DeviceId deviceId) {
441 + synchronized (this) {
442 + Device device = devices.remove(deviceId);
443 + return device == null ? null :
444 + new DeviceEvent(DEVICE_REMOVED, device, null);
445 + }
446 + }
447 +
448 + private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
449 + if (lhs == rhs) {
450 + return true;
451 + }
452 + if (lhs == null || rhs == null) {
453 + return false;
454 + }
455 +
456 + if (!lhs.keys().equals(rhs.keys())) {
457 + return false;
458 + }
459 +
460 + for (String key : lhs.keys()) {
461 + if (!lhs.value(key).equals(rhs.value(key))) {
462 + return false;
463 + }
464 + }
465 + return true;
466 + }
467 +
468 + /**
469 + * Returns a Device, merging description given from multiple Providers.
470 + *
471 + * @param deviceId device identifier
472 + * @param providerDescs Collection of Descriptions from multiple providers
473 + * @return Device instance
474 + */
475 + private Device composeDevice(DeviceId deviceId,
476 + ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
477 +
478 + checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
479 +
480 + ProviderId primary = pickPrimaryPID(providerDescs);
481 +
482 + DeviceDescriptions desc = providerDescs.get(primary);
483 +
484 + DeviceDescription base = desc.getDeviceDesc().value();
485 + Type type = base.type();
486 + String manufacturer = base.manufacturer();
487 + String hwVersion = base.hwVersion();
488 + String swVersion = base.swVersion();
489 + String serialNumber = base.serialNumber();
490 + DefaultAnnotations annotations = DefaultAnnotations.builder().build();
491 + annotations = merge(annotations, base.annotations());
492 +
493 + for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
494 + if (e.getKey().equals(primary)) {
495 + continue;
496 + }
497 + // TODO: should keep track of Description timestamp
498 + // and only merge conflicting keys when timestamp is newer
499 + // Currently assuming there will never be a key conflict between
500 + // providers
501 +
502 + // annotation merging. not so efficient, should revisit later
503 + annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
504 + }
505 +
506 + return new DefaultDevice(primary, deviceId , type, manufacturer,
507 + hwVersion, swVersion, serialNumber, annotations);
508 + }
509 +
510 + /**
511 + * Returns a Port, merging description given from multiple Providers.
512 + *
513 + * @param device device the port is on
514 + * @param number port number
515 + * @param providerDescs Collection of Descriptions from multiple providers
516 + * @return Port instance
517 + */
518 + private Port composePort(Device device, PortNumber number,
519 + ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
520 +
521 + ProviderId primary = pickPrimaryPID(providerDescs);
522 + DeviceDescriptions primDescs = providerDescs.get(primary);
523 + // if no primary, assume not enabled
524 + // TODO: revisit this default port enabled/disabled behavior
525 + boolean isEnabled = false;
526 + DefaultAnnotations annotations = DefaultAnnotations.builder().build();
527 +
528 + final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
529 + if (portDesc != null) {
530 + isEnabled = portDesc.value().isEnabled();
531 + annotations = merge(annotations, portDesc.value().annotations());
532 + }
533 +
534 + for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
535 + if (e.getKey().equals(primary)) {
536 + continue;
537 + }
538 + // TODO: should keep track of Description timestamp
539 + // and only merge conflicting keys when timestamp is newer
540 + // Currently assuming there will never be a key conflict between
541 + // providers
542 +
543 + // annotation merging. not so efficient, should revisit later
544 + final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
545 + if (otherPortDesc != null) {
546 + annotations = merge(annotations, otherPortDesc.value().annotations());
547 + }
548 + }
549 +
550 + return new DefaultPort(device, number, isEnabled, annotations);
551 + }
552 +
553 + /**
554 + * @return primary ProviderID, or randomly chosen one if none exists
555 + */
556 + private ProviderId pickPrimaryPID(
557 + ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
558 + ProviderId fallBackPrimary = null;
559 + for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
560 + if (!e.getKey().isAncillary()) {
561 + return e.getKey();
562 + } else if (fallBackPrimary == null) {
563 + // pick randomly as a fallback in case there is no primary
564 + fallBackPrimary = e.getKey();
565 + }
566 + }
567 + return fallBackPrimary;
568 + }
569 +
570 + private static final class InitConcurrentHashMap<K, V> implements
571 + ConcurrentInitializer<ConcurrentMap<K, V>> {
572 + @Override
573 + public ConcurrentMap<K, V> get() throws ConcurrentException {
574 + return new ConcurrentHashMap<>();
575 + }
576 + }
577 +
578 + public static final class InitDeviceDescs
579 + implements ConcurrentInitializer<DeviceDescriptions> {
580 +
581 + private final Timestamped<DeviceDescription> deviceDesc;
582 +
583 + public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
584 + this.deviceDesc = checkNotNull(deviceDesc);
585 + }
586 + @Override
587 + public DeviceDescriptions get() throws ConcurrentException {
588 + return new DeviceDescriptions(deviceDesc);
589 + }
590 + }
591 +
592 +
593 + /**
594 + * Collection of Description of a Device and it's Ports given from a Provider.
595 + */
596 + public static class DeviceDescriptions {
597 +
598 + private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
599 + private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
600 +
601 + public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
602 + this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
603 + this.portDescs = new ConcurrentHashMap<>();
604 + }
605 +
606 + public Timestamped<DeviceDescription> getDeviceDesc() {
607 + return deviceDesc.get();
608 + }
609 +
610 + public Timestamped<PortDescription> getPortDesc(PortNumber number) {
611 + return portDescs.get(number);
612 + }
613 +
614 + /**
615 + * Puts DeviceDescription, merging annotations as necessary.
616 + *
617 + * @param newDesc new DeviceDescription
618 + * @return previous DeviceDescription
619 + */
620 + public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
621 + Timestamped<DeviceDescription> oldOne = deviceDesc.get();
622 + Timestamped<DeviceDescription> newOne = newDesc;
623 + if (oldOne != null) {
624 + SparseAnnotations merged = merge(oldOne.value().annotations(),
625 + newDesc.value().annotations());
626 + newOne = new Timestamped<DeviceDescription>(
627 + new DefaultDeviceDescription(newDesc.value(), merged),
628 + newDesc.timestamp());
629 + }
630 + return deviceDesc.getAndSet(newOne);
631 + }
632 +
633 + /**
634 + * Puts PortDescription, merging annotations as necessary.
635 + *
636 + * @param newDesc new PortDescription
637 + * @return previous PortDescription
638 + */
639 + public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
640 + Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
641 + Timestamped<PortDescription> newOne = newDesc;
642 + if (oldOne != null) {
643 + SparseAnnotations merged = merge(oldOne.value().annotations(),
644 + newDesc.value().annotations());
645 + newOne = new Timestamped<PortDescription>(
646 + new DefaultPortDescription(newDesc.value(), merged),
647 + newDesc.timestamp());
648 + }
649 + return portDescs.put(newOne.value().portNumber(), newOne);
650 + }
651 + }
652 +}
1 -package org.onlab.onos.store.device.impl;
2 -
3 -import static com.google.common.base.Predicates.notNull;
4 -import static com.google.common.base.Preconditions.checkState;
5 -
6 -import com.google.common.collect.FluentIterable;
7 -import com.google.common.collect.ImmutableSet;
8 -import com.google.common.collect.ImmutableSet.Builder;
9 -
10 -import org.apache.felix.scr.annotations.Activate;
11 -import org.apache.felix.scr.annotations.Component;
12 -import org.apache.felix.scr.annotations.Deactivate;
13 -import org.apache.felix.scr.annotations.Reference;
14 -import org.apache.felix.scr.annotations.ReferenceCardinality;
15 -import org.apache.felix.scr.annotations.Service;
16 -import org.onlab.onos.net.DefaultDevice;
17 -import org.onlab.onos.net.DefaultPort;
18 -import org.onlab.onos.net.Device;
19 -import org.onlab.onos.net.DeviceId;
20 -import org.onlab.onos.net.Port;
21 -import org.onlab.onos.net.PortNumber;
22 -import org.onlab.onos.net.device.DeviceDescription;
23 -import org.onlab.onos.net.device.DeviceEvent;
24 -import org.onlab.onos.net.device.DeviceStore;
25 -import org.onlab.onos.net.device.DeviceStoreDelegate;
26 -import org.onlab.onos.net.device.PortDescription;
27 -import org.onlab.onos.net.provider.ProviderId;
28 -import org.onlab.onos.store.AbstractStore;
29 -import org.onlab.onos.store.ClockService;
30 -import org.onlab.onos.store.Timestamp;
31 -import org.slf4j.Logger;
32 -
33 -import java.util.ArrayList;
34 -import java.util.Collections;
35 -import java.util.HashMap;
36 -import java.util.HashSet;
37 -import java.util.Iterator;
38 -import java.util.List;
39 -import java.util.Map;
40 -import java.util.Objects;
41 -import java.util.Set;
42 -import java.util.concurrent.ConcurrentHashMap;
43 -import java.util.concurrent.ConcurrentMap;
44 -
45 -import static com.google.common.base.Preconditions.checkArgument;
46 -import static org.onlab.onos.net.device.DeviceEvent.Type.*;
47 -import static org.slf4j.LoggerFactory.getLogger;
48 -
49 -//TODO: Add support for multiple provider and annotations
50 -/**
51 - * Manages inventory of infrastructure devices using a protocol that takes into consideration
52 - * the order in which device events occur.
53 - */
54 -@Component(immediate = true)
55 -@Service
56 -public class OnosDistributedDeviceStore
57 - extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
58 - implements DeviceStore {
59 -
60 - private final Logger log = getLogger(getClass());
61 -
62 - public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
63 -
64 - private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
65 - private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
66 -
67 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 - protected ClockService clockService;
69 -
70 - @Activate
71 - public void activate() {
72 -
73 - devices = new ConcurrentHashMap<>();
74 - devicePorts = new ConcurrentHashMap<>();
75 -
76 - log.info("Started");
77 - }
78 -
79 - @Deactivate
80 - public void deactivate() {
81 - log.info("Stopped");
82 - }
83 -
84 - @Override
85 - public int getDeviceCount() {
86 - return devices.size();
87 - }
88 -
89 - @Override
90 - public Iterable<Device> getDevices() {
91 - Builder<Device> builder = ImmutableSet.builder();
92 - synchronized (this) {
93 - for (VersionedValue<Device> device : devices.values()) {
94 - builder.add(device.entity());
95 - }
96 - return builder.build();
97 - }
98 - }
99 -
100 - @Override
101 - public Device getDevice(DeviceId deviceId) {
102 - VersionedValue<Device> device = devices.get(deviceId);
103 - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
104 - return device.entity();
105 - }
106 -
107 - @Override
108 - public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
109 - DeviceDescription deviceDescription) {
110 - Timestamp newTimestamp = clockService.getTimestamp(deviceId);
111 - VersionedValue<Device> device = devices.get(deviceId);
112 -
113 - if (device == null) {
114 - return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
115 - }
116 -
117 - checkState(newTimestamp.compareTo(device.timestamp()) > 0,
118 - "Existing device has a timestamp in the future!");
119 -
120 - return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
121 - }
122 -
123 - // Creates the device and returns the appropriate event if necessary.
124 - private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
125 - DeviceDescription desc, Timestamp timestamp) {
126 - Device device = new DefaultDevice(providerId, deviceId, desc.type(),
127 - desc.manufacturer(),
128 - desc.hwVersion(), desc.swVersion(),
129 - desc.serialNumber());
130 -
131 - devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
132 - // TODO,FIXME: broadcast a message telling peers of a device event.
133 - return new DeviceEvent(DEVICE_ADDED, device, null);
134 - }
135 -
136 - // Updates the device and returns the appropriate event if necessary.
137 - private DeviceEvent updateDevice(ProviderId providerId, Device device,
138 - DeviceDescription desc, Timestamp timestamp) {
139 - // We allow only certain attributes to trigger update
140 - if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
141 - !Objects.equals(device.swVersion(), desc.swVersion())) {
142 -
143 - Device updated = new DefaultDevice(providerId, device.id(),
144 - desc.type(),
145 - desc.manufacturer(),
146 - desc.hwVersion(),
147 - desc.swVersion(),
148 - desc.serialNumber());
149 - devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
150 - // FIXME: broadcast a message telling peers of a device event.
151 - return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
152 - }
153 -
154 - // Otherwise merely attempt to change availability
155 - Device updated = new DefaultDevice(providerId, device.id(),
156 - desc.type(),
157 - desc.manufacturer(),
158 - desc.hwVersion(),
159 - desc.swVersion(),
160 - desc.serialNumber());
161 -
162 - VersionedValue<Device> oldDevice = devices.put(device.id(),
163 - new VersionedValue<Device>(updated, true, timestamp));
164 - if (!oldDevice.isUp()) {
165 - return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
166 - } else {
167 - return null;
168 - }
169 - }
170 -
171 - @Override
172 - public DeviceEvent markOffline(DeviceId deviceId) {
173 - VersionedValue<Device> device = devices.get(deviceId);
174 - boolean willRemove = device != null && device.isUp();
175 - if (!willRemove) {
176 - return null;
177 - }
178 - Timestamp timestamp = clockService.getTimestamp(deviceId);
179 - if (replaceIfLatest(device.entity(), false, timestamp)) {
180 - return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
181 - }
182 - return null;
183 - }
184 -
185 - // Replace existing value if its timestamp is older.
186 - private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
187 - VersionedValue<Device> existingValue = devices.get(device.id());
188 - if (timestamp.compareTo(existingValue.timestamp()) > 0) {
189 - devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
190 - return true;
191 - }
192 - return false;
193 - }
194 -
195 - @Override
196 - public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
197 - List<PortDescription> portDescriptions) {
198 - List<DeviceEvent> events = new ArrayList<>();
199 - synchronized (this) {
200 - VersionedValue<Device> device = devices.get(deviceId);
201 - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
202 - Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
203 - Timestamp newTimestamp = clockService.getTimestamp(deviceId);
204 -
205 - // Add new ports
206 - Set<PortNumber> processed = new HashSet<>();
207 - for (PortDescription portDescription : portDescriptions) {
208 - VersionedValue<Port> port = ports.get(portDescription.portNumber());
209 - if (port == null) {
210 - events.add(createPort(device, portDescription, ports, newTimestamp));
211 - }
212 - checkState(newTimestamp.compareTo(port.timestamp()) > 0,
213 - "Existing port state has a timestamp in the future!");
214 - events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
215 - processed.add(portDescription.portNumber());
216 - }
217 -
218 - updatePortMap(deviceId, ports);
219 -
220 - events.addAll(pruneOldPorts(device.entity(), ports, processed));
221 - }
222 - return FluentIterable.from(events).filter(notNull()).toList();
223 - }
224 -
225 - // Creates a new port based on the port description adds it to the map and
226 - // Returns corresponding event.
227 - //@GuardedBy("this")
228 - private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
229 - Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
230 - Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
231 - portDescription.isEnabled());
232 - ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
233 - updatePortMap(device.entity().id(), ports);
234 - return new DeviceEvent(PORT_ADDED, device.entity(), port);
235 - }
236 -
237 - // Checks if the specified port requires update and if so, it replaces the
238 - // existing entry in the map and returns corresponding event.
239 - //@GuardedBy("this")
240 - private DeviceEvent updatePort(Device device, Port port,
241 - PortDescription portDescription,
242 - Map<PortNumber, VersionedValue<Port>> ports,
243 - Timestamp timestamp) {
244 - if (port.isEnabled() != portDescription.isEnabled()) {
245 - VersionedValue<Port> updatedPort = new VersionedValue<Port>(
246 - new DefaultPort(device, portDescription.portNumber(),
247 - portDescription.isEnabled()),
248 - portDescription.isEnabled(),
249 - timestamp);
250 - ports.put(port.number(), updatedPort);
251 - updatePortMap(device.id(), ports);
252 - return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
253 - }
254 - return null;
255 - }
256 -
257 - // Prunes the specified list of ports based on which ports are in the
258 - // processed list and returns list of corresponding events.
259 - //@GuardedBy("this")
260 - private List<DeviceEvent> pruneOldPorts(Device device,
261 - Map<PortNumber, VersionedValue<Port>> ports,
262 - Set<PortNumber> processed) {
263 - List<DeviceEvent> events = new ArrayList<>();
264 - Iterator<PortNumber> iterator = ports.keySet().iterator();
265 - while (iterator.hasNext()) {
266 - PortNumber portNumber = iterator.next();
267 - if (!processed.contains(portNumber)) {
268 - events.add(new DeviceEvent(PORT_REMOVED, device,
269 - ports.get(portNumber).entity()));
270 - iterator.remove();
271 - }
272 - }
273 - if (!events.isEmpty()) {
274 - updatePortMap(device.id(), ports);
275 - }
276 - return events;
277 - }
278 -
279 - // Gets the map of ports for the specified device; if one does not already
280 - // exist, it creates and registers a new one.
281 - // WARN: returned value is a copy, changes made to the Map
282 - // needs to be written back using updatePortMap
283 - //@GuardedBy("this")
284 - private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
285 - Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
286 - if (ports == null) {
287 - ports = new HashMap<>();
288 - // this probably is waste of time in most cases.
289 - updatePortMap(deviceId, ports);
290 - }
291 - return ports;
292 - }
293 -
294 - //@GuardedBy("this")
295 - private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
296 - devicePorts.put(deviceId, ports);
297 - }
298 -
299 - @Override
300 - public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
301 - PortDescription portDescription) {
302 - VersionedValue<Device> device = devices.get(deviceId);
303 - checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
304 - Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
305 - VersionedValue<Port> port = ports.get(portDescription.portNumber());
306 - Timestamp timestamp = clockService.getTimestamp(deviceId);
307 - return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
308 - }
309 -
310 - @Override
311 - public List<Port> getPorts(DeviceId deviceId) {
312 - Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
313 - if (versionedPorts == null) {
314 - return Collections.emptyList();
315 - }
316 - List<Port> ports = new ArrayList<>();
317 - for (VersionedValue<Port> port : versionedPorts.values()) {
318 - ports.add(port.entity());
319 - }
320 - return ports;
321 - }
322 -
323 - @Override
324 - public Port getPort(DeviceId deviceId, PortNumber portNumber) {
325 - Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
326 - return ports == null ? null : ports.get(portNumber).entity();
327 - }
328 -
329 - @Override
330 - public boolean isAvailable(DeviceId deviceId) {
331 - return devices.get(deviceId).isUp();
332 - }
333 -
334 - @Override
335 - public DeviceEvent removeDevice(DeviceId deviceId) {
336 - VersionedValue<Device> previousDevice = devices.remove(deviceId);
337 - return previousDevice == null ? null :
338 - new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
339 - }
340 -}
...@@ -84,4 +84,11 @@ public final class OnosTimestamp implements Timestamp { ...@@ -84,4 +84,11 @@ public final class OnosTimestamp implements Timestamp {
84 public int sequenceNumber() { 84 public int sequenceNumber() {
85 return sequenceNumber; 85 return sequenceNumber;
86 } 86 }
87 +
88 + // Default constructor for serialization
89 + @Deprecated
90 + protected OnosTimestamp() {
91 + this.termNumber = -1;
92 + this.sequenceNumber = -1;
93 + }
87 } 94 }
......
1 +package org.onlab.onos.store.common.impl;
2 +
3 +import static org.junit.Assert.*;
4 +
5 +import java.nio.ByteBuffer;
6 +
7 +import org.junit.After;
8 +import org.junit.AfterClass;
9 +import org.junit.Before;
10 +import org.junit.BeforeClass;
11 +import org.junit.Test;
12 +import org.onlab.onos.store.Timestamp;
13 +import org.onlab.onos.store.impl.OnosTimestamp;
14 +import org.onlab.util.KryoPool;
15 +
16 +import com.google.common.testing.EqualsTester;
17 +
18 +public class TimestampedTest {
19 +
20 + private static final Timestamp TS_1_1 = new OnosTimestamp(1, 1);
21 + private static final Timestamp TS_1_2 = new OnosTimestamp(1, 2);
22 + private static final Timestamp TS_2_1 = new OnosTimestamp(2, 1);
23 +
24 + @BeforeClass
25 + public static void setUpBeforeClass() throws Exception {
26 + }
27 +
28 + @AfterClass
29 + public static void tearDownAfterClass() throws Exception {
30 + }
31 +
32 + @Before
33 + public void setUp() throws Exception {
34 + }
35 +
36 + @After
37 + public void tearDown() throws Exception {
38 + }
39 +
40 + @Test
41 + public final void testHashCode() {
42 + Timestamped<String> a = new Timestamped<>("a", TS_1_1);
43 + Timestamped<String> b = new Timestamped<>("b", TS_1_1);
44 + assertTrue("value does not impact hashCode",
45 + a.hashCode() == b.hashCode());
46 + }
47 +
48 + @Test
49 + public final void testEquals() {
50 + Timestamped<String> a = new Timestamped<>("a", TS_1_1);
51 + Timestamped<String> b = new Timestamped<>("b", TS_1_1);
52 + assertTrue("value does not impact equality",
53 + a.equals(b));
54 +
55 + new EqualsTester()
56 + .addEqualityGroup(new Timestamped<>("a", TS_1_1),
57 + new Timestamped<>("b", TS_1_1),
58 + new Timestamped<>("c", TS_1_1))
59 + .addEqualityGroup(new Timestamped<>("a", TS_1_2),
60 + new Timestamped<>("b", TS_1_2),
61 + new Timestamped<>("c", TS_1_2))
62 + .addEqualityGroup(new Timestamped<>("a", TS_2_1),
63 + new Timestamped<>("b", TS_2_1),
64 + new Timestamped<>("c", TS_2_1))
65 + .testEquals();
66 +
67 + }
68 +
69 + @Test
70 + public final void testValue() {
71 + final Integer n = Integer.valueOf(42);
72 + Timestamped<Integer> tsv = new Timestamped<>(n, TS_1_1);
73 + assertSame(n, tsv.value());
74 +
75 + }
76 +
77 + @Test(expected = NullPointerException.class)
78 + public final void testValueNonNull() {
79 + new Timestamped<>(null, TS_1_1);
80 + }
81 +
82 + @Test(expected = NullPointerException.class)
83 + public final void testTimestampNonNull() {
84 + new Timestamped<>("Foo", null);
85 + }
86 +
87 + @Test
88 + public final void testIsNewer() {
89 + Timestamped<String> a = new Timestamped<>("a", TS_1_2);
90 + Timestamped<String> b = new Timestamped<>("b", TS_1_1);
91 + assertTrue(a.isNewer(b));
92 + assertFalse(b.isNewer(a));
93 + }
94 +
95 + @Test
96 + public final void testKryoSerializable() {
97 + final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
98 + final KryoPool kryos = KryoPool.newBuilder()
99 + .register(Timestamped.class,
100 + OnosTimestamp.class)
101 + .build();
102 +
103 + Timestamped<String> original = new Timestamped<>("foobar", TS_1_1);
104 + kryos.serialize(original, buffer);
105 + buffer.flip();
106 + Timestamped<String> copy = kryos.deserialize(buffer);
107 +
108 + new EqualsTester()
109 + .addEqualityGroup(original, copy)
110 + .testEquals();
111 + }
112 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import static org.junit.Assert.*;
4 +import static org.onlab.onos.net.Device.Type.SWITCH;
5 +import static org.onlab.onos.net.DeviceId.deviceId;
6 +import static org.onlab.onos.net.device.DeviceEvent.Type.*;
7 +
8 +import java.util.Arrays;
9 +import java.util.HashMap;
10 +import java.util.List;
11 +import java.util.Map;
12 +import java.util.Set;
13 +import java.util.concurrent.CountDownLatch;
14 +import java.util.concurrent.TimeUnit;
15 +
16 +import org.junit.After;
17 +import org.junit.AfterClass;
18 +import org.junit.Before;
19 +import org.junit.BeforeClass;
20 +import org.junit.Ignore;
21 +import org.junit.Test;
22 +import org.onlab.onos.cluster.MastershipTerm;
23 +import org.onlab.onos.cluster.NodeId;
24 +import org.onlab.onos.net.Annotations;
25 +import org.onlab.onos.net.DefaultAnnotations;
26 +import org.onlab.onos.net.Device;
27 +import org.onlab.onos.net.DeviceId;
28 +import org.onlab.onos.net.Port;
29 +import org.onlab.onos.net.PortNumber;
30 +import org.onlab.onos.net.SparseAnnotations;
31 +import org.onlab.onos.net.device.DefaultDeviceDescription;
32 +import org.onlab.onos.net.device.DefaultPortDescription;
33 +import org.onlab.onos.net.device.DeviceDescription;
34 +import org.onlab.onos.net.device.DeviceEvent;
35 +import org.onlab.onos.net.device.DeviceStore;
36 +import org.onlab.onos.net.device.DeviceStoreDelegate;
37 +import org.onlab.onos.net.device.PortDescription;
38 +import org.onlab.onos.net.provider.ProviderId;
39 +import org.onlab.onos.store.ClockService;
40 +
41 +import com.google.common.collect.Iterables;
42 +import com.google.common.collect.Sets;
43 +
44 +
45 +// TODO add tests for remote replication
46 +/**
47 + * Test of the gossip based distributed DeviceStore implementation.
48 + */
49 +public class GossipDeviceStoreTest {
50 +
51 + private static final ProviderId PID = new ProviderId("of", "foo");
52 + private static final ProviderId PIDA = new ProviderId("of", "bar", true);
53 + private static final DeviceId DID1 = deviceId("of:foo");
54 + private static final DeviceId DID2 = deviceId("of:bar");
55 + private static final String MFR = "whitebox";
56 + private static final String HW = "1.1.x";
57 + private static final String SW1 = "3.8.1";
58 + private static final String SW2 = "3.9.5";
59 + private static final String SN = "43311-12345";
60 +
61 + private static final PortNumber P1 = PortNumber.portNumber(1);
62 + private static final PortNumber P2 = PortNumber.portNumber(2);
63 + private static final PortNumber P3 = PortNumber.portNumber(3);
64 +
65 + private static final SparseAnnotations A1 = DefaultAnnotations.builder()
66 + .set("A1", "a1")
67 + .set("B1", "b1")
68 + .build();
69 + private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
70 + .remove("A1")
71 + .set("B3", "b3")
72 + .build();
73 + private static final SparseAnnotations A2 = DefaultAnnotations.builder()
74 + .set("A2", "a2")
75 + .set("B2", "b2")
76 + .build();
77 + private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
78 + .remove("A2")
79 + .set("B4", "b4")
80 + .build();
81 +
82 + private static final NodeId MYSELF = new NodeId("myself");
83 +
84 + private GossipDeviceStore gossipDeviceStore;
85 + private DeviceStore deviceStore;
86 +
87 + private DeviceClockManager deviceClockManager;
88 + private ClockService clockService;
89 +
90 + @BeforeClass
91 + public static void setUpBeforeClass() throws Exception {
92 + }
93 +
94 + @AfterClass
95 + public static void tearDownAfterClass() throws Exception {
96 + }
97 +
98 +
99 + @Before
100 + public void setUp() throws Exception {
101 + deviceClockManager = new DeviceClockManager();
102 + deviceClockManager.activate();
103 + clockService = deviceClockManager;
104 +
105 + deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
106 + deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
107 +
108 + gossipDeviceStore = new TestGossipDeviceStore(clockService);
109 + gossipDeviceStore.activate();
110 + deviceStore = gossipDeviceStore;
111 + }
112 +
113 + @After
114 + public void tearDown() throws Exception {
115 + gossipDeviceStore.deactivate();
116 + deviceClockManager.deactivate();
117 + }
118 +
119 + private void putDevice(DeviceId deviceId, String swVersion) {
120 + DeviceDescription description =
121 + new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
122 + HW, swVersion, SN);
123 + deviceStore.createOrUpdateDevice(PID, deviceId, description);
124 + }
125 +
126 + private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
127 + DeviceDescription description =
128 + new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
129 + HW, swVersion, SN);
130 + deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
131 + }
132 +
133 + private static void assertDevice(DeviceId id, String swVersion, Device device) {
134 + assertNotNull(device);
135 + assertEquals(id, device.id());
136 + assertEquals(MFR, device.manufacturer());
137 + assertEquals(HW, device.hwVersion());
138 + assertEquals(swVersion, device.swVersion());
139 + assertEquals(SN, device.serialNumber());
140 + }
141 +
142 + /**
143 + * Verifies that Annotations created by merging {@code annotations} is
144 + * equal to actual Annotations.
145 + *
146 + * @param actual Annotations to check
147 + * @param annotations
148 + */
149 + private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
150 + DefaultAnnotations expected = DefaultAnnotations.builder().build();
151 + for (SparseAnnotations a : annotations) {
152 + expected = DefaultAnnotations.merge(expected, a);
153 + }
154 + assertEquals(expected.keys(), actual.keys());
155 + for (String key : expected.keys()) {
156 + assertEquals(expected.value(key), actual.value(key));
157 + }
158 + }
159 +
160 + @Test
161 + public final void testGetDeviceCount() {
162 + assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
163 +
164 + putDevice(DID1, SW1);
165 + putDevice(DID2, SW2);
166 + putDevice(DID1, SW1);
167 +
168 + assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount());
169 + }
170 +
171 + @Test
172 + public final void testGetDevices() {
173 + assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices()));
174 +
175 + putDevice(DID1, SW1);
176 + putDevice(DID2, SW2);
177 + putDevice(DID1, SW1);
178 +
179 + assertEquals("expect 2 uniq devices",
180 + 2, Iterables.size(deviceStore.getDevices()));
181 +
182 + Map<DeviceId, Device> devices = new HashMap<>();
183 + for (Device device : deviceStore.getDevices()) {
184 + devices.put(device.id(), device);
185 + }
186 +
187 + assertDevice(DID1, SW1, devices.get(DID1));
188 + assertDevice(DID2, SW2, devices.get(DID2));
189 +
190 + // add case for new node?
191 + }
192 +
193 + @Test
194 + public final void testGetDevice() {
195 +
196 + putDevice(DID1, SW1);
197 +
198 + assertDevice(DID1, SW1, deviceStore.getDevice(DID1));
199 + assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
200 + }
201 +
202 + @Test
203 + public final void testCreateOrUpdateDevice() {
204 + DeviceDescription description =
205 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
206 + HW, SW1, SN);
207 + DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
208 + assertEquals(DEVICE_ADDED, event.type());
209 + assertDevice(DID1, SW1, event.subject());
210 +
211 + DeviceDescription description2 =
212 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
213 + HW, SW2, SN);
214 + DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
215 + assertEquals(DEVICE_UPDATED, event2.type());
216 + assertDevice(DID1, SW2, event2.subject());
217 +
218 + assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
219 + }
220 +
221 + @Test
222 + public final void testCreateOrUpdateDeviceAncillary() {
223 + DeviceDescription description =
224 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
225 + HW, SW1, SN, A2);
226 + DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
227 + assertEquals(DEVICE_ADDED, event.type());
228 + assertDevice(DID1, SW1, event.subject());
229 + assertEquals(PIDA, event.subject().providerId());
230 + assertAnnotationsEquals(event.subject().annotations(), A2);
231 + assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
232 +
233 + DeviceDescription description2 =
234 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
235 + HW, SW2, SN, A1);
236 + DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
237 + assertEquals(DEVICE_UPDATED, event2.type());
238 + assertDevice(DID1, SW2, event2.subject());
239 + assertEquals(PID, event2.subject().providerId());
240 + assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
241 + assertTrue(deviceStore.isAvailable(DID1));
242 +
243 + assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
244 +
245 + // For now, Ancillary is ignored once primary appears
246 + assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
247 +
248 + // But, Ancillary annotations will be in effect
249 + DeviceDescription description3 =
250 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
251 + HW, SW1, SN, A2_2);
252 + DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
253 + assertEquals(DEVICE_UPDATED, event3.type());
254 + // basic information will be the one from Primary
255 + assertDevice(DID1, SW2, event3.subject());
256 + assertEquals(PID, event3.subject().providerId());
257 + // but annotation from Ancillary will be merged
258 + assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
259 + assertTrue(deviceStore.isAvailable(DID1));
260 + }
261 +
262 +
263 + @Test
264 + public final void testMarkOffline() {
265 +
266 + putDevice(DID1, SW1);
267 + assertTrue(deviceStore.isAvailable(DID1));
268 +
269 + DeviceEvent event = deviceStore.markOffline(DID1);
270 + assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
271 + assertDevice(DID1, SW1, event.subject());
272 + assertFalse(deviceStore.isAvailable(DID1));
273 +
274 + DeviceEvent event2 = deviceStore.markOffline(DID1);
275 + assertNull("No change, no event", event2);
276 +}
277 +
278 + @Test
279 + public final void testUpdatePorts() {
280 + putDevice(DID1, SW1);
281 + List<PortDescription> pds = Arrays.<PortDescription>asList(
282 + new DefaultPortDescription(P1, true),
283 + new DefaultPortDescription(P2, true)
284 + );
285 +
286 + List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
287 +
288 + Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
289 + for (DeviceEvent event : events) {
290 + assertEquals(PORT_ADDED, event.type());
291 + assertDevice(DID1, SW1, event.subject());
292 + assertTrue("PortNumber is one of expected",
293 + expectedPorts.remove(event.port().number()));
294 + assertTrue("Port is enabled", event.port().isEnabled());
295 + }
296 + assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
297 +
298 +
299 + List<PortDescription> pds2 = Arrays.<PortDescription>asList(
300 + new DefaultPortDescription(P1, false),
301 + new DefaultPortDescription(P2, true),
302 + new DefaultPortDescription(P3, true)
303 + );
304 +
305 + events = deviceStore.updatePorts(PID, DID1, pds2);
306 + assertFalse("event should be triggered", events.isEmpty());
307 + for (DeviceEvent event : events) {
308 + PortNumber num = event.port().number();
309 + if (P1.equals(num)) {
310 + assertEquals(PORT_UPDATED, event.type());
311 + assertDevice(DID1, SW1, event.subject());
312 + assertFalse("Port is disabled", event.port().isEnabled());
313 + } else if (P2.equals(num)) {
314 + fail("P2 event not expected.");
315 + } else if (P3.equals(num)) {
316 + assertEquals(PORT_ADDED, event.type());
317 + assertDevice(DID1, SW1, event.subject());
318 + assertTrue("Port is enabled", event.port().isEnabled());
319 + } else {
320 + fail("Unknown port number encountered: " + num);
321 + }
322 + }
323 +
324 + List<PortDescription> pds3 = Arrays.<PortDescription>asList(
325 + new DefaultPortDescription(P1, false),
326 + new DefaultPortDescription(P2, true)
327 + );
328 + events = deviceStore.updatePorts(PID, DID1, pds3);
329 + assertFalse("event should be triggered", events.isEmpty());
330 + for (DeviceEvent event : events) {
331 + PortNumber num = event.port().number();
332 + if (P1.equals(num)) {
333 + fail("P1 event not expected.");
334 + } else if (P2.equals(num)) {
335 + fail("P2 event not expected.");
336 + } else if (P3.equals(num)) {
337 + assertEquals(PORT_REMOVED, event.type());
338 + assertDevice(DID1, SW1, event.subject());
339 + assertTrue("Port was enabled", event.port().isEnabled());
340 + } else {
341 + fail("Unknown port number encountered: " + num);
342 + }
343 + }
344 +
345 + }
346 +
347 + @Test
348 + public final void testUpdatePortStatus() {
349 + putDevice(DID1, SW1);
350 + List<PortDescription> pds = Arrays.<PortDescription>asList(
351 + new DefaultPortDescription(P1, true)
352 + );
353 + deviceStore.updatePorts(PID, DID1, pds);
354 +
355 + DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
356 + new DefaultPortDescription(P1, false));
357 + assertEquals(PORT_UPDATED, event.type());
358 + assertDevice(DID1, SW1, event.subject());
359 + assertEquals(P1, event.port().number());
360 + assertFalse("Port is disabled", event.port().isEnabled());
361 +
362 + }
363 + @Test
364 + public final void testUpdatePortStatusAncillary() {
365 + putDeviceAncillary(DID1, SW1);
366 + putDevice(DID1, SW1);
367 + List<PortDescription> pds = Arrays.<PortDescription>asList(
368 + new DefaultPortDescription(P1, true, A1)
369 + );
370 + deviceStore.updatePorts(PID, DID1, pds);
371 +
372 + DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
373 + new DefaultPortDescription(P1, false, A1_2));
374 + assertEquals(PORT_UPDATED, event.type());
375 + assertDevice(DID1, SW1, event.subject());
376 + assertEquals(P1, event.port().number());
377 + assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
378 + assertFalse("Port is disabled", event.port().isEnabled());
379 +
380 + DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
381 + new DefaultPortDescription(P1, true));
382 + assertNull("Ancillary is ignored if primary exists", event2);
383 +
384 + // but, Ancillary annotation update will be notified
385 + DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
386 + new DefaultPortDescription(P1, true, A2));
387 + assertEquals(PORT_UPDATED, event3.type());
388 + assertDevice(DID1, SW1, event3.subject());
389 + assertEquals(P1, event3.port().number());
390 + assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
391 + assertFalse("Port is disabled", event3.port().isEnabled());
392 +
393 + // port only reported from Ancillary will be notified as down
394 + DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
395 + new DefaultPortDescription(P2, true));
396 + assertEquals(PORT_ADDED, event4.type());
397 + assertDevice(DID1, SW1, event4.subject());
398 + assertEquals(P2, event4.port().number());
399 + assertAnnotationsEquals(event4.port().annotations());
400 + assertFalse("Port is disabled if not given from primary provider",
401 + event4.port().isEnabled());
402 + }
403 +
404 + @Test
405 + public final void testGetPorts() {
406 + putDevice(DID1, SW1);
407 + putDevice(DID2, SW1);
408 + List<PortDescription> pds = Arrays.<PortDescription>asList(
409 + new DefaultPortDescription(P1, true),
410 + new DefaultPortDescription(P2, true)
411 + );
412 + deviceStore.updatePorts(PID, DID1, pds);
413 +
414 + Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
415 + List<Port> ports = deviceStore.getPorts(DID1);
416 + for (Port port : ports) {
417 + assertTrue("Port is enabled", port.isEnabled());
418 + assertTrue("PortNumber is one of expected",
419 + expectedPorts.remove(port.number()));
420 + }
421 + assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
422 +
423 +
424 + assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty());
425 + }
426 +
427 + @Test
428 + public final void testGetPort() {
429 + putDevice(DID1, SW1);
430 + putDevice(DID2, SW1);
431 + List<PortDescription> pds = Arrays.<PortDescription>asList(
432 + new DefaultPortDescription(P1, true),
433 + new DefaultPortDescription(P2, false)
434 + );
435 + deviceStore.updatePorts(PID, DID1, pds);
436 +
437 + Port port1 = deviceStore.getPort(DID1, P1);
438 + assertEquals(P1, port1.number());
439 + assertTrue("Port is enabled", port1.isEnabled());
440 +
441 + Port port2 = deviceStore.getPort(DID1, P2);
442 + assertEquals(P2, port2.number());
443 + assertFalse("Port is disabled", port2.isEnabled());
444 +
445 + Port port3 = deviceStore.getPort(DID1, P3);
446 + assertNull("P3 not expected", port3);
447 + }
448 +
449 + @Test
450 + public final void testRemoveDevice() {
451 + putDevice(DID1, SW1);
452 + putDevice(DID2, SW1);
453 +
454 + assertEquals(2, deviceStore.getDeviceCount());
455 +
456 + DeviceEvent event = deviceStore.removeDevice(DID1);
457 + assertEquals(DEVICE_REMOVED, event.type());
458 + assertDevice(DID1, SW1, event.subject());
459 +
460 + assertEquals(1, deviceStore.getDeviceCount());
461 + }
462 +
463 + // If Delegates should be called only on remote events,
464 + // then Simple* should never call them, thus not test required.
465 + // TODO add test for Port events when we have them
466 + @Ignore("Ignore until Delegate spec. is clear.")
467 + @Test
468 + public final void testEvents() throws InterruptedException {
469 + final CountDownLatch addLatch = new CountDownLatch(1);
470 + DeviceStoreDelegate checkAdd = new DeviceStoreDelegate() {
471 + @Override
472 + public void notify(DeviceEvent event) {
473 + assertEquals(DEVICE_ADDED, event.type());
474 + assertDevice(DID1, SW1, event.subject());
475 + addLatch.countDown();
476 + }
477 + };
478 + final CountDownLatch updateLatch = new CountDownLatch(1);
479 + DeviceStoreDelegate checkUpdate = new DeviceStoreDelegate() {
480 + @Override
481 + public void notify(DeviceEvent event) {
482 + assertEquals(DEVICE_UPDATED, event.type());
483 + assertDevice(DID1, SW2, event.subject());
484 + updateLatch.countDown();
485 + }
486 + };
487 + final CountDownLatch removeLatch = new CountDownLatch(1);
488 + DeviceStoreDelegate checkRemove = new DeviceStoreDelegate() {
489 + @Override
490 + public void notify(DeviceEvent event) {
491 + assertEquals(DEVICE_REMOVED, event.type());
492 + assertDevice(DID1, SW2, event.subject());
493 + removeLatch.countDown();
494 + }
495 + };
496 +
497 + DeviceDescription description =
498 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
499 + HW, SW1, SN);
500 + deviceStore.setDelegate(checkAdd);
501 + deviceStore.createOrUpdateDevice(PID, DID1, description);
502 + assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
503 +
504 +
505 + DeviceDescription description2 =
506 + new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
507 + HW, SW2, SN);
508 + deviceStore.unsetDelegate(checkAdd);
509 + deviceStore.setDelegate(checkUpdate);
510 + deviceStore.createOrUpdateDevice(PID, DID1, description2);
511 + assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS));
512 +
513 + deviceStore.unsetDelegate(checkUpdate);
514 + deviceStore.setDelegate(checkRemove);
515 + deviceStore.removeDevice(DID1);
516 + assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS));
517 + }
518 +
519 + private static final class TestGossipDeviceStore extends GossipDeviceStore {
520 +
521 + public TestGossipDeviceStore(ClockService clockService) {
522 + this.clockService = clockService;
523 + }
524 + }
525 +}