tom

Merge remote-tracking branch 'origin/master'

...@@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore ...@@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore
86 86
87 @Override 87 @Override
88 public Iterable<Device> getDevices() { 88 public Iterable<Device> getDevices() {
89 - // TODO builder v.s. copyOf. Guava semms to be using copyOf?
90 - // FIXME: synchronize.
91 Builder<Device> builder = ImmutableSet.builder(); 89 Builder<Device> builder = ImmutableSet.builder();
92 - for (VersionedValue<? extends Device> device : devices.values()) { 90 + synchronized (this) {
93 - builder.add(device.entity()); 91 + for (VersionedValue<Device> device : devices.values()) {
92 + builder.add(device.entity());
93 + }
94 + return builder.build();
94 } 95 }
95 - return builder.build();
96 } 96 }
97 97
98 @Override 98 @Override
99 public Device getDevice(DeviceId deviceId) { 99 public Device getDevice(DeviceId deviceId) {
100 - return devices.get(deviceId).entity(); 100 + VersionedValue<Device> device = devices.get(deviceId);
101 + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
102 + return device.entity();
101 } 103 }
102 104
103 @Override 105 @Override
104 public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, 106 public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
105 DeviceDescription deviceDescription) { 107 DeviceDescription deviceDescription) {
106 - Timestamp now = clockService.getTimestamp(deviceId); 108 + Timestamp newTimestamp = clockService.getTimestamp(deviceId);
107 VersionedValue<Device> device = devices.get(deviceId); 109 VersionedValue<Device> device = devices.get(deviceId);
108 110
109 if (device == null) { 111 if (device == null) {
110 - return createDevice(providerId, deviceId, deviceDescription, now); 112 + return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
111 } 113 }
112 114
113 - checkState(now.compareTo(device.timestamp()) > 0, 115 + checkState(newTimestamp.compareTo(device.timestamp()) > 0,
114 "Existing device has a timestamp in the future!"); 116 "Existing device has a timestamp in the future!");
115 117
116 - return updateDevice(providerId, device.entity(), deviceDescription, now); 118 + return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
117 } 119 }
118 120
119 // Creates the device and returns the appropriate event if necessary. 121 // Creates the device and returns the appropriate event if necessary.
120 private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId, 122 private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
121 DeviceDescription desc, Timestamp timestamp) { 123 DeviceDescription desc, Timestamp timestamp) {
122 - DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(), 124 + Device device = new DefaultDevice(providerId, deviceId, desc.type(),
123 desc.manufacturer(), 125 desc.manufacturer(),
124 desc.hwVersion(), desc.swVersion(), 126 desc.hwVersion(), desc.swVersion(),
125 desc.serialNumber()); 127 desc.serialNumber());
126 128
127 - devices.put(deviceId, new VersionedValue<Device>(device, true, timestamp)); 129 + devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
128 - // FIXME: broadcast a message telling peers of a device event. 130 + // TODO,FIXME: broadcast a message telling peers of a device event.
129 return new DeviceEvent(DEVICE_ADDED, device, null); 131 return new DeviceEvent(DEVICE_ADDED, device, null);
130 } 132 }
131 133
...@@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore ...@@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore
148 } 150 }
149 151
150 // Otherwise merely attempt to change availability 152 // Otherwise merely attempt to change availability
151 - DefaultDevice updated = new DefaultDevice(providerId, device.id(), 153 + Device updated = new DefaultDevice(providerId, device.id(),
152 desc.type(), 154 desc.type(),
153 desc.manufacturer(), 155 desc.manufacturer(),
154 desc.hwVersion(), 156 desc.hwVersion(),
...@@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore ...@@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore
196 VersionedValue<Device> device = devices.get(deviceId); 198 VersionedValue<Device> device = devices.get(deviceId);
197 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); 199 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
198 Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId); 200 Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
199 - Timestamp timestamp = clockService.getTimestamp(deviceId); 201 + Timestamp newTimestamp = clockService.getTimestamp(deviceId);
200 202
201 // Add new ports 203 // Add new ports
202 Set<PortNumber> processed = new HashSet<>(); 204 Set<PortNumber> processed = new HashSet<>();
203 for (PortDescription portDescription : portDescriptions) { 205 for (PortDescription portDescription : portDescriptions) {
204 VersionedValue<Port> port = ports.get(portDescription.portNumber()); 206 VersionedValue<Port> port = ports.get(portDescription.portNumber());
205 if (port == null) { 207 if (port == null) {
206 - events.add(createPort(device, portDescription, ports, timestamp)); 208 + events.add(createPort(device, portDescription, ports, newTimestamp));
207 } 209 }
208 - checkState(timestamp.compareTo(port.timestamp()) > 0, 210 + checkState(newTimestamp.compareTo(port.timestamp()) > 0,
209 "Existing port state has a timestamp in the future!"); 211 "Existing port state has a timestamp in the future!");
210 - events.add(updatePort(device, port, portDescription, ports, timestamp)); 212 + events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
211 processed.add(portDescription.portNumber()); 213 processed.add(portDescription.portNumber());
212 } 214 }
213 215
...@@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore ...@@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore
233 // Checks if the specified port requires update and if so, it replaces the 235 // Checks if the specified port requires update and if so, it replaces the
234 // existing entry in the map and returns corresponding event. 236 // existing entry in the map and returns corresponding event.
235 //@GuardedBy("this") 237 //@GuardedBy("this")
236 - private DeviceEvent updatePort(VersionedValue<Device> device, VersionedValue<Port> port, 238 + private DeviceEvent updatePort(Device device, Port port,
237 PortDescription portDescription, 239 PortDescription portDescription,
238 Map<PortNumber, VersionedValue<Port>> ports, 240 Map<PortNumber, VersionedValue<Port>> ports,
239 Timestamp timestamp) { 241 Timestamp timestamp) {
240 - if (port.entity().isEnabled() != portDescription.isEnabled()) { 242 + if (port.isEnabled() != portDescription.isEnabled()) {
241 VersionedValue<Port> updatedPort = new VersionedValue<Port>( 243 VersionedValue<Port> updatedPort = new VersionedValue<Port>(
242 - new DefaultPort(device.entity(), portDescription.portNumber(), 244 + new DefaultPort(device, portDescription.portNumber(),
243 portDescription.isEnabled()), 245 portDescription.isEnabled()),
244 portDescription.isEnabled(), 246 portDescription.isEnabled(),
245 timestamp); 247 timestamp);
246 - ports.put(port.entity().number(), updatedPort); 248 + ports.put(port.number(), updatedPort);
247 - updatePortMap(device.entity().id(), ports); 249 + updatePortMap(device.id(), ports);
248 - return new DeviceEvent(PORT_UPDATED, device.entity(), updatedPort.entity()); 250 + return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
249 } 251 }
250 return null; 252 return null;
251 } 253 }
...@@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore ...@@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore
300 Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId); 302 Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
301 VersionedValue<Port> port = ports.get(portDescription.portNumber()); 303 VersionedValue<Port> port = ports.get(portDescription.portNumber());
302 Timestamp timestamp = clockService.getTimestamp(deviceId); 304 Timestamp timestamp = clockService.getTimestamp(deviceId);
303 - return updatePort(device, port, portDescription, ports, timestamp); 305 + return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
304 } 306 }
305 307
306 @Override 308 @Override
......
1 +package org.onlab.onos.store.link.impl;
2 +
3 +import static org.onlab.onos.net.Link.Type.DIRECT;
4 +import static org.onlab.onos.net.Link.Type.INDIRECT;
5 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
6 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
7 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
8 +import static org.slf4j.LoggerFactory.getLogger;
9 +
10 +import java.util.HashSet;
11 +import java.util.Set;
12 +import java.util.concurrent.ConcurrentHashMap;
13 +import java.util.concurrent.ConcurrentMap;
14 +
15 +import org.apache.felix.scr.annotations.Activate;
16 +import org.apache.felix.scr.annotations.Component;
17 +import org.apache.felix.scr.annotations.Deactivate;
18 +import org.apache.felix.scr.annotations.Reference;
19 +import org.apache.felix.scr.annotations.ReferenceCardinality;
20 +import org.apache.felix.scr.annotations.Service;
21 +import org.onlab.onos.net.ConnectPoint;
22 +import org.onlab.onos.net.DefaultLink;
23 +import org.onlab.onos.net.DeviceId;
24 +import org.onlab.onos.net.Link;
25 +import org.onlab.onos.net.LinkKey;
26 +import org.onlab.onos.net.link.LinkDescription;
27 +import org.onlab.onos.net.link.LinkEvent;
28 +import org.onlab.onos.net.link.LinkStore;
29 +import org.onlab.onos.net.link.LinkStoreDelegate;
30 +import org.onlab.onos.net.provider.ProviderId;
31 +import org.onlab.onos.store.AbstractStore;
32 +import org.onlab.onos.store.ClockService;
33 +import org.onlab.onos.store.Timestamp;
34 +import org.onlab.onos.store.device.impl.VersionedValue;
35 +import org.slf4j.Logger;
36 +
37 +import com.google.common.collect.HashMultimap;
38 +import com.google.common.collect.ImmutableSet;
39 +import com.google.common.collect.Multimap;
40 +import com.google.common.collect.ImmutableSet.Builder;
41 +
42 +import static com.google.common.base.Preconditions.checkArgument;
43 +import static com.google.common.base.Preconditions.checkState;
44 +
45 +/**
46 + * Manages inventory of infrastructure links using a protocol that takes into consideration
47 + * the order in which events occur.
48 + */
49 +// FIXME: This does not yet implement the full protocol.
50 +// The full protocol requires the sender of LLDP message to include the
51 +// version information of src device/port and the receiver to
52 +// take that into account when figuring out if a more recent src
53 +// device/port down event renders the link discovery obsolete.
54 +@Component(immediate = true)
55 +@Service
56 +public class OnosDistributedLinkStore
57 + extends AbstractStore<LinkEvent, LinkStoreDelegate>
58 + implements LinkStore {
59 +
60 + private final Logger log = getLogger(getClass());
61 +
62 + // Link inventory
63 + private ConcurrentMap<LinkKey, VersionedValue<Link>> links;
64 +
65 + public static final String LINK_NOT_FOUND = "Link between %s and %s not found";
66 +
67 + // TODO synchronize?
68 + // Egress and ingress link sets
69 + private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create();
70 + private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
71 +
72 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 + protected ClockService clockService;
74 +
75 + @Activate
76 + public void activate() {
77 +
78 + links = new ConcurrentHashMap<>();
79 +
80 + log.info("Started");
81 + }
82 +
83 + @Deactivate
84 + public void deactivate() {
85 + log.info("Stopped");
86 + }
87 +
88 + @Override
89 + public int getLinkCount() {
90 + return links.size();
91 + }
92 +
93 + @Override
94 + public Iterable<Link> getLinks() {
95 + Builder<Link> builder = ImmutableSet.builder();
96 + synchronized (this) {
97 + for (VersionedValue<Link> link : links.values()) {
98 + builder.add(link.entity());
99 + }
100 + return builder.build();
101 + }
102 + }
103 +
104 + @Override
105 + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
106 + Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId));
107 + Set<Link> rawEgressLinks = new HashSet<>();
108 + for (VersionedValue<Link> link : egressLinks) {
109 + rawEgressLinks.add(link.entity());
110 + }
111 + return rawEgressLinks;
112 + }
113 +
114 + @Override
115 + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
116 + Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId));
117 + Set<Link> rawIngressLinks = new HashSet<>();
118 + for (VersionedValue<Link> link : ingressLinks) {
119 + rawIngressLinks.add(link.entity());
120 + }
121 + return rawIngressLinks;
122 + }
123 +
124 + @Override
125 + public Link getLink(ConnectPoint src, ConnectPoint dst) {
126 + VersionedValue<Link> link = links.get(new LinkKey(src, dst));
127 + checkArgument(link != null, "LINK_NOT_FOUND", src, dst);
128 + return link.entity();
129 + }
130 +
131 + @Override
132 + public Set<Link> getEgressLinks(ConnectPoint src) {
133 + Set<Link> egressLinks = new HashSet<>();
134 + for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) {
135 + if (link.entity().src().equals(src)) {
136 + egressLinks.add(link.entity());
137 + }
138 + }
139 + return egressLinks;
140 + }
141 +
142 + @Override
143 + public Set<Link> getIngressLinks(ConnectPoint dst) {
144 + Set<Link> ingressLinks = new HashSet<>();
145 + for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) {
146 + if (link.entity().dst().equals(dst)) {
147 + ingressLinks.add(link.entity());
148 + }
149 + }
150 + return ingressLinks;
151 + }
152 +
153 + @Override
154 + public LinkEvent createOrUpdateLink(ProviderId providerId,
155 + LinkDescription linkDescription) {
156 +
157 + final DeviceId destinationDeviceId = linkDescription.dst().deviceId();
158 + final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId);
159 +
160 + LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
161 + VersionedValue<Link> link = links.get(key);
162 + if (link == null) {
163 + return createLink(providerId, key, linkDescription, newTimestamp);
164 + }
165 +
166 + checkState(newTimestamp.compareTo(link.timestamp()) > 0,
167 + "Existing Link has a timestamp in the future!");
168 +
169 + return updateLink(providerId, link, key, linkDescription, newTimestamp);
170 + }
171 +
172 + // Creates and stores the link and returns the appropriate event.
173 + private LinkEvent createLink(ProviderId providerId, LinkKey key,
174 + LinkDescription linkDescription, Timestamp timestamp) {
175 + VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(),
176 + linkDescription.type()), true, timestamp);
177 + synchronized (this) {
178 + links.put(key, link);
179 + addNewLink(link, timestamp);
180 + }
181 + // FIXME: notify peers.
182 + return new LinkEvent(LINK_ADDED, link.entity());
183 + }
184 +
185 + // update Egress and ingress link sets
186 + private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) {
187 + Link rawLink = link.entity();
188 + synchronized (this) {
189 + srcLinks.put(rawLink.src().deviceId(), link);
190 + dstLinks.put(rawLink.dst().deviceId(), link);
191 + }
192 + }
193 +
194 + // Updates, if necessary the specified link and returns the appropriate event.
195 + private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink,
196 + LinkKey key, LinkDescription linkDescription, Timestamp timestamp) {
197 + // FIXME confirm Link update condition is OK
198 + if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) {
199 + synchronized (this) {
200 +
201 + VersionedValue<Link> updatedLink = new VersionedValue<Link>(
202 + new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(),
203 + linkDescription.type()), true, timestamp);
204 + links.replace(key, existingLink, updatedLink);
205 +
206 + replaceLink(existingLink, updatedLink);
207 + // FIXME: notify peers.
208 + return new LinkEvent(LINK_UPDATED, updatedLink.entity());
209 + }
210 + }
211 + return null;
212 + }
213 +
214 + // update Egress and ingress link sets
215 + private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) {
216 + synchronized (this) {
217 + srcLinks.remove(current.entity().src().deviceId(), current);
218 + dstLinks.remove(current.entity().dst().deviceId(), current);
219 +
220 + srcLinks.put(current.entity().src().deviceId(), updated);
221 + dstLinks.put(current.entity().dst().deviceId(), updated);
222 + }
223 + }
224 +
225 + @Override
226 + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
227 + synchronized (this) {
228 + LinkKey key = new LinkKey(src, dst);
229 + VersionedValue<Link> link = links.remove(key);
230 + if (link != null) {
231 + removeLink(link);
232 + // notify peers
233 + return new LinkEvent(LINK_REMOVED, link.entity());
234 + }
235 + return null;
236 + }
237 + }
238 +
239 + // update Egress and ingress link sets
240 + private void removeLink(VersionedValue<Link> link) {
241 + synchronized (this) {
242 + srcLinks.remove(link.entity().src().deviceId(), link);
243 + dstLinks.remove(link.entity().dst().deviceId(), link);
244 + }
245 + }
246 +}