Distributed Link store implementation that takes event versioning into consideration
Showing
2 changed files
with
273 additions
and
25 deletions
... | @@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore | ... | @@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore |
86 | 86 | ||
87 | @Override | 87 | @Override |
88 | public Iterable<Device> getDevices() { | 88 | public Iterable<Device> getDevices() { |
89 | - // TODO builder v.s. copyOf. Guava semms to be using copyOf? | ||
90 | - // FIXME: synchronize. | ||
91 | Builder<Device> builder = ImmutableSet.builder(); | 89 | Builder<Device> builder = ImmutableSet.builder(); |
92 | - for (VersionedValue<? extends Device> device : devices.values()) { | 90 | + synchronized (this) { |
93 | - builder.add(device.entity()); | 91 | + for (VersionedValue<Device> device : devices.values()) { |
92 | + builder.add(device.entity()); | ||
93 | + } | ||
94 | + return builder.build(); | ||
94 | } | 95 | } |
95 | - return builder.build(); | ||
96 | } | 96 | } |
97 | 97 | ||
98 | @Override | 98 | @Override |
99 | public Device getDevice(DeviceId deviceId) { | 99 | public Device getDevice(DeviceId deviceId) { |
100 | - return devices.get(deviceId).entity(); | 100 | + VersionedValue<Device> device = devices.get(deviceId); |
101 | + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); | ||
102 | + return device.entity(); | ||
101 | } | 103 | } |
102 | 104 | ||
103 | @Override | 105 | @Override |
104 | public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, | 106 | public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, |
105 | DeviceDescription deviceDescription) { | 107 | DeviceDescription deviceDescription) { |
106 | - Timestamp now = clockService.getTimestamp(deviceId); | 108 | + Timestamp newTimestamp = clockService.getTimestamp(deviceId); |
107 | VersionedValue<Device> device = devices.get(deviceId); | 109 | VersionedValue<Device> device = devices.get(deviceId); |
108 | 110 | ||
109 | if (device == null) { | 111 | if (device == null) { |
110 | - return createDevice(providerId, deviceId, deviceDescription, now); | 112 | + return createDevice(providerId, deviceId, deviceDescription, newTimestamp); |
111 | } | 113 | } |
112 | 114 | ||
113 | - checkState(now.compareTo(device.timestamp()) > 0, | 115 | + checkState(newTimestamp.compareTo(device.timestamp()) > 0, |
114 | "Existing device has a timestamp in the future!"); | 116 | "Existing device has a timestamp in the future!"); |
115 | 117 | ||
116 | - return updateDevice(providerId, device.entity(), deviceDescription, now); | 118 | + return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp); |
117 | } | 119 | } |
118 | 120 | ||
119 | // Creates the device and returns the appropriate event if necessary. | 121 | // Creates the device and returns the appropriate event if necessary. |
120 | private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId, | 122 | private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId, |
121 | DeviceDescription desc, Timestamp timestamp) { | 123 | DeviceDescription desc, Timestamp timestamp) { |
122 | - DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(), | 124 | + Device device = new DefaultDevice(providerId, deviceId, desc.type(), |
123 | desc.manufacturer(), | 125 | desc.manufacturer(), |
124 | desc.hwVersion(), desc.swVersion(), | 126 | desc.hwVersion(), desc.swVersion(), |
125 | desc.serialNumber()); | 127 | desc.serialNumber()); |
126 | 128 | ||
127 | - devices.put(deviceId, new VersionedValue<Device>(device, true, timestamp)); | 129 | + devices.put(deviceId, new VersionedValue<>(device, true, timestamp)); |
128 | - // FIXME: broadcast a message telling peers of a device event. | 130 | + // TODO,FIXME: broadcast a message telling peers of a device event. |
129 | return new DeviceEvent(DEVICE_ADDED, device, null); | 131 | return new DeviceEvent(DEVICE_ADDED, device, null); |
130 | } | 132 | } |
131 | 133 | ||
... | @@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore | ... | @@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore |
148 | } | 150 | } |
149 | 151 | ||
150 | // Otherwise merely attempt to change availability | 152 | // Otherwise merely attempt to change availability |
151 | - DefaultDevice updated = new DefaultDevice(providerId, device.id(), | 153 | + Device updated = new DefaultDevice(providerId, device.id(), |
152 | desc.type(), | 154 | desc.type(), |
153 | desc.manufacturer(), | 155 | desc.manufacturer(), |
154 | desc.hwVersion(), | 156 | desc.hwVersion(), |
... | @@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore | ... | @@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore |
196 | VersionedValue<Device> device = devices.get(deviceId); | 198 | VersionedValue<Device> device = devices.get(deviceId); |
197 | checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); | 199 | checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); |
198 | Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId); | 200 | Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId); |
199 | - Timestamp timestamp = clockService.getTimestamp(deviceId); | 201 | + Timestamp newTimestamp = clockService.getTimestamp(deviceId); |
200 | 202 | ||
201 | // Add new ports | 203 | // Add new ports |
202 | Set<PortNumber> processed = new HashSet<>(); | 204 | Set<PortNumber> processed = new HashSet<>(); |
203 | for (PortDescription portDescription : portDescriptions) { | 205 | for (PortDescription portDescription : portDescriptions) { |
204 | VersionedValue<Port> port = ports.get(portDescription.portNumber()); | 206 | VersionedValue<Port> port = ports.get(portDescription.portNumber()); |
205 | if (port == null) { | 207 | if (port == null) { |
206 | - events.add(createPort(device, portDescription, ports, timestamp)); | 208 | + events.add(createPort(device, portDescription, ports, newTimestamp)); |
207 | } | 209 | } |
208 | - checkState(timestamp.compareTo(port.timestamp()) > 0, | 210 | + checkState(newTimestamp.compareTo(port.timestamp()) > 0, |
209 | "Existing port state has a timestamp in the future!"); | 211 | "Existing port state has a timestamp in the future!"); |
210 | - events.add(updatePort(device, port, portDescription, ports, timestamp)); | 212 | + events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp)); |
211 | processed.add(portDescription.portNumber()); | 213 | processed.add(portDescription.portNumber()); |
212 | } | 214 | } |
213 | 215 | ||
... | @@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore | ... | @@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore |
233 | // Checks if the specified port requires update and if so, it replaces the | 235 | // Checks if the specified port requires update and if so, it replaces the |
234 | // existing entry in the map and returns corresponding event. | 236 | // existing entry in the map and returns corresponding event. |
235 | //@GuardedBy("this") | 237 | //@GuardedBy("this") |
236 | - private DeviceEvent updatePort(VersionedValue<Device> device, VersionedValue<Port> port, | 238 | + private DeviceEvent updatePort(Device device, Port port, |
237 | PortDescription portDescription, | 239 | PortDescription portDescription, |
238 | Map<PortNumber, VersionedValue<Port>> ports, | 240 | Map<PortNumber, VersionedValue<Port>> ports, |
239 | Timestamp timestamp) { | 241 | Timestamp timestamp) { |
240 | - if (port.entity().isEnabled() != portDescription.isEnabled()) { | 242 | + if (port.isEnabled() != portDescription.isEnabled()) { |
241 | VersionedValue<Port> updatedPort = new VersionedValue<Port>( | 243 | VersionedValue<Port> updatedPort = new VersionedValue<Port>( |
242 | - new DefaultPort(device.entity(), portDescription.portNumber(), | 244 | + new DefaultPort(device, portDescription.portNumber(), |
243 | portDescription.isEnabled()), | 245 | portDescription.isEnabled()), |
244 | portDescription.isEnabled(), | 246 | portDescription.isEnabled(), |
245 | timestamp); | 247 | timestamp); |
246 | - ports.put(port.entity().number(), updatedPort); | 248 | + ports.put(port.number(), updatedPort); |
247 | - updatePortMap(device.entity().id(), ports); | 249 | + updatePortMap(device.id(), ports); |
248 | - return new DeviceEvent(PORT_UPDATED, device.entity(), updatedPort.entity()); | 250 | + return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity()); |
249 | } | 251 | } |
250 | return null; | 252 | return null; |
251 | } | 253 | } |
... | @@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore | ... | @@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore |
300 | Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId); | 302 | Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId); |
301 | VersionedValue<Port> port = ports.get(portDescription.portNumber()); | 303 | VersionedValue<Port> port = ports.get(portDescription.portNumber()); |
302 | Timestamp timestamp = clockService.getTimestamp(deviceId); | 304 | Timestamp timestamp = clockService.getTimestamp(deviceId); |
303 | - return updatePort(device, port, portDescription, ports, timestamp); | 305 | + return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp); |
304 | } | 306 | } |
305 | 307 | ||
306 | @Override | 308 | @Override | ... | ... |
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
0 → 100644
1 | +package org.onlab.onos.store.link.impl; | ||
2 | + | ||
3 | +import static org.onlab.onos.net.Link.Type.DIRECT; | ||
4 | +import static org.onlab.onos.net.Link.Type.INDIRECT; | ||
5 | +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED; | ||
6 | +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED; | ||
7 | +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED; | ||
8 | +import static org.slf4j.LoggerFactory.getLogger; | ||
9 | + | ||
10 | +import java.util.HashSet; | ||
11 | +import java.util.Set; | ||
12 | +import java.util.concurrent.ConcurrentHashMap; | ||
13 | +import java.util.concurrent.ConcurrentMap; | ||
14 | + | ||
15 | +import org.apache.felix.scr.annotations.Activate; | ||
16 | +import org.apache.felix.scr.annotations.Component; | ||
17 | +import org.apache.felix.scr.annotations.Deactivate; | ||
18 | +import org.apache.felix.scr.annotations.Reference; | ||
19 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
20 | +import org.apache.felix.scr.annotations.Service; | ||
21 | +import org.onlab.onos.net.ConnectPoint; | ||
22 | +import org.onlab.onos.net.DefaultLink; | ||
23 | +import org.onlab.onos.net.DeviceId; | ||
24 | +import org.onlab.onos.net.Link; | ||
25 | +import org.onlab.onos.net.LinkKey; | ||
26 | +import org.onlab.onos.net.link.LinkDescription; | ||
27 | +import org.onlab.onos.net.link.LinkEvent; | ||
28 | +import org.onlab.onos.net.link.LinkStore; | ||
29 | +import org.onlab.onos.net.link.LinkStoreDelegate; | ||
30 | +import org.onlab.onos.net.provider.ProviderId; | ||
31 | +import org.onlab.onos.store.AbstractStore; | ||
32 | +import org.onlab.onos.store.ClockService; | ||
33 | +import org.onlab.onos.store.Timestamp; | ||
34 | +import org.onlab.onos.store.device.impl.VersionedValue; | ||
35 | +import org.slf4j.Logger; | ||
36 | + | ||
37 | +import com.google.common.collect.HashMultimap; | ||
38 | +import com.google.common.collect.ImmutableSet; | ||
39 | +import com.google.common.collect.Multimap; | ||
40 | +import com.google.common.collect.ImmutableSet.Builder; | ||
41 | + | ||
42 | +import static com.google.common.base.Preconditions.checkArgument; | ||
43 | +import static com.google.common.base.Preconditions.checkState; | ||
44 | + | ||
45 | +/** | ||
46 | + * Manages inventory of infrastructure links using a protocol that takes into consideration | ||
47 | + * the order in which events occur. | ||
48 | + */ | ||
49 | +// FIXME: This does not yet implement the full protocol. | ||
50 | +// The full protocol requires the sender of LLDP message to include the | ||
51 | +// version information of src device/port and the receiver to | ||
52 | +// take that into account when figuring out if a more recent src | ||
53 | +// device/port down event renders the link discovery obsolete. | ||
54 | +@Component(immediate = true) | ||
55 | +@Service | ||
56 | +public class OnosDistributedLinkStore | ||
57 | + extends AbstractStore<LinkEvent, LinkStoreDelegate> | ||
58 | + implements LinkStore { | ||
59 | + | ||
60 | + private final Logger log = getLogger(getClass()); | ||
61 | + | ||
62 | + // Link inventory | ||
63 | + private ConcurrentMap<LinkKey, VersionedValue<Link>> links; | ||
64 | + | ||
65 | + public static final String LINK_NOT_FOUND = "Link between %s and %s not found"; | ||
66 | + | ||
67 | + // TODO synchronize? | ||
68 | + // Egress and ingress link sets | ||
69 | + private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create(); | ||
70 | + private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create(); | ||
71 | + | ||
72 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
73 | + protected ClockService clockService; | ||
74 | + | ||
75 | + @Activate | ||
76 | + public void activate() { | ||
77 | + | ||
78 | + links = new ConcurrentHashMap<>(); | ||
79 | + | ||
80 | + log.info("Started"); | ||
81 | + } | ||
82 | + | ||
83 | + @Deactivate | ||
84 | + public void deactivate() { | ||
85 | + log.info("Stopped"); | ||
86 | + } | ||
87 | + | ||
88 | + @Override | ||
89 | + public int getLinkCount() { | ||
90 | + return links.size(); | ||
91 | + } | ||
92 | + | ||
93 | + @Override | ||
94 | + public Iterable<Link> getLinks() { | ||
95 | + Builder<Link> builder = ImmutableSet.builder(); | ||
96 | + synchronized (this) { | ||
97 | + for (VersionedValue<Link> link : links.values()) { | ||
98 | + builder.add(link.entity()); | ||
99 | + } | ||
100 | + return builder.build(); | ||
101 | + } | ||
102 | + } | ||
103 | + | ||
104 | + @Override | ||
105 | + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) { | ||
106 | + Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId)); | ||
107 | + Set<Link> rawEgressLinks = new HashSet<>(); | ||
108 | + for (VersionedValue<Link> link : egressLinks) { | ||
109 | + rawEgressLinks.add(link.entity()); | ||
110 | + } | ||
111 | + return rawEgressLinks; | ||
112 | + } | ||
113 | + | ||
114 | + @Override | ||
115 | + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) { | ||
116 | + Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId)); | ||
117 | + Set<Link> rawIngressLinks = new HashSet<>(); | ||
118 | + for (VersionedValue<Link> link : ingressLinks) { | ||
119 | + rawIngressLinks.add(link.entity()); | ||
120 | + } | ||
121 | + return rawIngressLinks; | ||
122 | + } | ||
123 | + | ||
124 | + @Override | ||
125 | + public Link getLink(ConnectPoint src, ConnectPoint dst) { | ||
126 | + VersionedValue<Link> link = links.get(new LinkKey(src, dst)); | ||
127 | + checkArgument(link != null, "LINK_NOT_FOUND", src, dst); | ||
128 | + return link.entity(); | ||
129 | + } | ||
130 | + | ||
131 | + @Override | ||
132 | + public Set<Link> getEgressLinks(ConnectPoint src) { | ||
133 | + Set<Link> egressLinks = new HashSet<>(); | ||
134 | + for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) { | ||
135 | + if (link.entity().src().equals(src)) { | ||
136 | + egressLinks.add(link.entity()); | ||
137 | + } | ||
138 | + } | ||
139 | + return egressLinks; | ||
140 | + } | ||
141 | + | ||
142 | + @Override | ||
143 | + public Set<Link> getIngressLinks(ConnectPoint dst) { | ||
144 | + Set<Link> ingressLinks = new HashSet<>(); | ||
145 | + for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) { | ||
146 | + if (link.entity().dst().equals(dst)) { | ||
147 | + ingressLinks.add(link.entity()); | ||
148 | + } | ||
149 | + } | ||
150 | + return ingressLinks; | ||
151 | + } | ||
152 | + | ||
153 | + @Override | ||
154 | + public LinkEvent createOrUpdateLink(ProviderId providerId, | ||
155 | + LinkDescription linkDescription) { | ||
156 | + | ||
157 | + final DeviceId destinationDeviceId = linkDescription.dst().deviceId(); | ||
158 | + final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId); | ||
159 | + | ||
160 | + LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst()); | ||
161 | + VersionedValue<Link> link = links.get(key); | ||
162 | + if (link == null) { | ||
163 | + return createLink(providerId, key, linkDescription, newTimestamp); | ||
164 | + } | ||
165 | + | ||
166 | + checkState(newTimestamp.compareTo(link.timestamp()) > 0, | ||
167 | + "Existing Link has a timestamp in the future!"); | ||
168 | + | ||
169 | + return updateLink(providerId, link, key, linkDescription, newTimestamp); | ||
170 | + } | ||
171 | + | ||
172 | + // Creates and stores the link and returns the appropriate event. | ||
173 | + private LinkEvent createLink(ProviderId providerId, LinkKey key, | ||
174 | + LinkDescription linkDescription, Timestamp timestamp) { | ||
175 | + VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(), | ||
176 | + linkDescription.type()), true, timestamp); | ||
177 | + synchronized (this) { | ||
178 | + links.put(key, link); | ||
179 | + addNewLink(link, timestamp); | ||
180 | + } | ||
181 | + // FIXME: notify peers. | ||
182 | + return new LinkEvent(LINK_ADDED, link.entity()); | ||
183 | + } | ||
184 | + | ||
185 | + // update Egress and ingress link sets | ||
186 | + private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) { | ||
187 | + Link rawLink = link.entity(); | ||
188 | + synchronized (this) { | ||
189 | + srcLinks.put(rawLink.src().deviceId(), link); | ||
190 | + dstLinks.put(rawLink.dst().deviceId(), link); | ||
191 | + } | ||
192 | + } | ||
193 | + | ||
194 | + // Updates, if necessary the specified link and returns the appropriate event. | ||
195 | + private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink, | ||
196 | + LinkKey key, LinkDescription linkDescription, Timestamp timestamp) { | ||
197 | + // FIXME confirm Link update condition is OK | ||
198 | + if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) { | ||
199 | + synchronized (this) { | ||
200 | + | ||
201 | + VersionedValue<Link> updatedLink = new VersionedValue<Link>( | ||
202 | + new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(), | ||
203 | + linkDescription.type()), true, timestamp); | ||
204 | + links.replace(key, existingLink, updatedLink); | ||
205 | + | ||
206 | + replaceLink(existingLink, updatedLink); | ||
207 | + // FIXME: notify peers. | ||
208 | + return new LinkEvent(LINK_UPDATED, updatedLink.entity()); | ||
209 | + } | ||
210 | + } | ||
211 | + return null; | ||
212 | + } | ||
213 | + | ||
214 | + // update Egress and ingress link sets | ||
215 | + private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) { | ||
216 | + synchronized (this) { | ||
217 | + srcLinks.remove(current.entity().src().deviceId(), current); | ||
218 | + dstLinks.remove(current.entity().dst().deviceId(), current); | ||
219 | + | ||
220 | + srcLinks.put(current.entity().src().deviceId(), updated); | ||
221 | + dstLinks.put(current.entity().dst().deviceId(), updated); | ||
222 | + } | ||
223 | + } | ||
224 | + | ||
225 | + @Override | ||
226 | + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { | ||
227 | + synchronized (this) { | ||
228 | + LinkKey key = new LinkKey(src, dst); | ||
229 | + VersionedValue<Link> link = links.remove(key); | ||
230 | + if (link != null) { | ||
231 | + removeLink(link); | ||
232 | + // notify peers | ||
233 | + return new LinkEvent(LINK_REMOVED, link.entity()); | ||
234 | + } | ||
235 | + return null; | ||
236 | + } | ||
237 | + } | ||
238 | + | ||
239 | + // update Egress and ingress link sets | ||
240 | + private void removeLink(VersionedValue<Link> link) { | ||
241 | + synchronized (this) { | ||
242 | + srcLinks.remove(link.entity().src().deviceId(), link); | ||
243 | + dstLinks.remove(link.entity().dst().deviceId(), link); | ||
244 | + } | ||
245 | + } | ||
246 | +} |
-
Please register or login to post a comment