Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
Showing
7 changed files
with
597 additions
and
325 deletions
1 | +package org.onlab.onos.store.link.impl; | ||
2 | + | ||
3 | +import com.google.common.base.Function; | ||
4 | +import com.google.common.base.Predicate; | ||
5 | +import com.google.common.collect.FluentIterable; | ||
6 | +import com.google.common.collect.HashMultimap; | ||
7 | +import com.google.common.collect.Maps; | ||
8 | +import com.google.common.collect.SetMultimap; | ||
9 | + | ||
10 | +import org.apache.commons.lang3.concurrent.ConcurrentUtils; | ||
11 | +import org.apache.felix.scr.annotations.Activate; | ||
12 | +import org.apache.felix.scr.annotations.Component; | ||
13 | +import org.apache.felix.scr.annotations.Deactivate; | ||
14 | +import org.apache.felix.scr.annotations.Reference; | ||
15 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
16 | +import org.apache.felix.scr.annotations.Service; | ||
17 | +import org.onlab.onos.cluster.ClusterService; | ||
18 | +import org.onlab.onos.net.AnnotationsUtil; | ||
19 | +import org.onlab.onos.net.ConnectPoint; | ||
20 | +import org.onlab.onos.net.DefaultAnnotations; | ||
21 | +import org.onlab.onos.net.DefaultLink; | ||
22 | +import org.onlab.onos.net.DeviceId; | ||
23 | +import org.onlab.onos.net.Link; | ||
24 | +import org.onlab.onos.net.SparseAnnotations; | ||
25 | +import org.onlab.onos.net.Link.Type; | ||
26 | +import org.onlab.onos.net.LinkKey; | ||
27 | +import org.onlab.onos.net.Provided; | ||
28 | +import org.onlab.onos.net.link.DefaultLinkDescription; | ||
29 | +import org.onlab.onos.net.link.LinkDescription; | ||
30 | +import org.onlab.onos.net.link.LinkEvent; | ||
31 | +import org.onlab.onos.net.link.LinkStore; | ||
32 | +import org.onlab.onos.net.link.LinkStoreDelegate; | ||
33 | +import org.onlab.onos.net.provider.ProviderId; | ||
34 | +import org.onlab.onos.store.AbstractStore; | ||
35 | +import org.onlab.onos.store.ClockService; | ||
36 | +import org.onlab.onos.store.Timestamp; | ||
37 | +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; | ||
38 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
39 | +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; | ||
40 | +import org.onlab.onos.store.cluster.messaging.MessageSubject; | ||
41 | +import org.onlab.onos.store.common.impl.Timestamped; | ||
42 | +import org.onlab.onos.store.serializers.DistributedStoreSerializers; | ||
43 | +import org.onlab.onos.store.serializers.KryoSerializer; | ||
44 | +import org.onlab.util.KryoPool; | ||
45 | +import org.onlab.util.NewConcurrentHashMap; | ||
46 | +import org.slf4j.Logger; | ||
47 | + | ||
48 | +import java.io.IOException; | ||
49 | +import java.util.Collections; | ||
50 | +import java.util.HashSet; | ||
51 | +import java.util.Map; | ||
52 | +import java.util.Set; | ||
53 | +import java.util.Map.Entry; | ||
54 | +import java.util.concurrent.ConcurrentHashMap; | ||
55 | +import java.util.concurrent.ConcurrentMap; | ||
56 | + | ||
57 | +import static org.onlab.onos.net.DefaultAnnotations.union; | ||
58 | +import static org.onlab.onos.net.DefaultAnnotations.merge; | ||
59 | +import static org.onlab.onos.net.Link.Type.DIRECT; | ||
60 | +import static org.onlab.onos.net.Link.Type.INDIRECT; | ||
61 | +import static org.onlab.onos.net.link.LinkEvent.Type.*; | ||
62 | +import static org.slf4j.LoggerFactory.getLogger; | ||
63 | +import static com.google.common.collect.Multimaps.synchronizedSetMultimap; | ||
64 | +import static com.google.common.base.Predicates.notNull; | ||
65 | + | ||
66 | +/** | ||
67 | + * Manages inventory of infrastructure links in distributed data store | ||
68 | + * that uses optimistic replication and gossip based techniques. | ||
69 | + */ | ||
70 | +@Component(immediate = true) | ||
71 | +@Service | ||
72 | +public class GossipLinkStore | ||
73 | + extends AbstractStore<LinkEvent, LinkStoreDelegate> | ||
74 | + implements LinkStore { | ||
75 | + | ||
76 | + private final Logger log = getLogger(getClass()); | ||
77 | + | ||
78 | + // Link inventory | ||
79 | + private final ConcurrentMap<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>> linkDescs = | ||
80 | + new ConcurrentHashMap<>(); | ||
81 | + | ||
82 | + // Link instance cache | ||
83 | + private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>(); | ||
84 | + | ||
85 | + // Egress and ingress link sets | ||
86 | + private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap(); | ||
87 | + private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap(); | ||
88 | + | ||
89 | + // Remove links | ||
90 | + private final Map<LinkKey, Timestamp> removedLinks = Maps.newHashMap(); | ||
91 | + | ||
92 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
93 | + protected ClockService clockService; | ||
94 | + | ||
95 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
96 | + protected ClusterCommunicationService clusterCommunicator; | ||
97 | + | ||
98 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
99 | + protected ClusterService clusterService; | ||
100 | + | ||
101 | + private static final KryoSerializer SERIALIZER = new KryoSerializer() { | ||
102 | + @Override | ||
103 | + protected void setupKryoPool() { | ||
104 | + serializerPool = KryoPool.newBuilder() | ||
105 | + .register(DistributedStoreSerializers.COMMON) | ||
106 | + .register(InternalLinkEvent.class) | ||
107 | + .register(InternalLinkRemovedEvent.class) | ||
108 | + .build() | ||
109 | + .populate(1); | ||
110 | + } | ||
111 | + }; | ||
112 | + | ||
113 | + @Activate | ||
114 | + public void activate() { | ||
115 | + | ||
116 | + clusterCommunicator.addSubscriber( | ||
117 | + GossipLinkStoreMessageSubjects.LINK_UPDATE, new InternalLinkEventListener()); | ||
118 | + clusterCommunicator.addSubscriber( | ||
119 | + GossipLinkStoreMessageSubjects.LINK_REMOVED, new InternalLinkRemovedEventListener()); | ||
120 | + | ||
121 | + log.info("Started"); | ||
122 | + } | ||
123 | + | ||
124 | + @Deactivate | ||
125 | + public void deactivate() { | ||
126 | + linkDescs.clear(); | ||
127 | + links.clear(); | ||
128 | + srcLinks.clear(); | ||
129 | + dstLinks.clear(); | ||
130 | + log.info("Stopped"); | ||
131 | + } | ||
132 | + | ||
133 | + @Override | ||
134 | + public int getLinkCount() { | ||
135 | + return links.size(); | ||
136 | + } | ||
137 | + | ||
138 | + @Override | ||
139 | + public Iterable<Link> getLinks() { | ||
140 | + return Collections.unmodifiableCollection(links.values()); | ||
141 | + } | ||
142 | + | ||
143 | + @Override | ||
144 | + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) { | ||
145 | + // lock for iteration | ||
146 | + synchronized (srcLinks) { | ||
147 | + return FluentIterable.from(srcLinks.get(deviceId)) | ||
148 | + .transform(lookupLink()) | ||
149 | + .filter(notNull()) | ||
150 | + .toSet(); | ||
151 | + } | ||
152 | + } | ||
153 | + | ||
154 | + @Override | ||
155 | + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) { | ||
156 | + // lock for iteration | ||
157 | + synchronized (dstLinks) { | ||
158 | + return FluentIterable.from(dstLinks.get(deviceId)) | ||
159 | + .transform(lookupLink()) | ||
160 | + .filter(notNull()) | ||
161 | + .toSet(); | ||
162 | + } | ||
163 | + } | ||
164 | + | ||
165 | + @Override | ||
166 | + public Link getLink(ConnectPoint src, ConnectPoint dst) { | ||
167 | + return links.get(new LinkKey(src, dst)); | ||
168 | + } | ||
169 | + | ||
170 | + @Override | ||
171 | + public Set<Link> getEgressLinks(ConnectPoint src) { | ||
172 | + Set<Link> egress = new HashSet<>(); | ||
173 | + for (LinkKey linkKey : srcLinks.get(src.deviceId())) { | ||
174 | + if (linkKey.src().equals(src)) { | ||
175 | + egress.add(links.get(linkKey)); | ||
176 | + } | ||
177 | + } | ||
178 | + return egress; | ||
179 | + } | ||
180 | + | ||
181 | + @Override | ||
182 | + public Set<Link> getIngressLinks(ConnectPoint dst) { | ||
183 | + Set<Link> ingress = new HashSet<>(); | ||
184 | + for (LinkKey linkKey : dstLinks.get(dst.deviceId())) { | ||
185 | + if (linkKey.dst().equals(dst)) { | ||
186 | + ingress.add(links.get(linkKey)); | ||
187 | + } | ||
188 | + } | ||
189 | + return ingress; | ||
190 | + } | ||
191 | + | ||
192 | + @Override | ||
193 | + public LinkEvent createOrUpdateLink(ProviderId providerId, | ||
194 | + LinkDescription linkDescription) { | ||
195 | + | ||
196 | + DeviceId dstDeviceId = linkDescription.dst().deviceId(); | ||
197 | + Timestamp newTimestamp = clockService.getTimestamp(dstDeviceId); | ||
198 | + | ||
199 | + final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp); | ||
200 | + | ||
201 | + LinkEvent event = createOrUpdateLinkInternal(providerId, deltaDesc); | ||
202 | + | ||
203 | + if (event != null) { | ||
204 | + log.info("Notifying peers of a link update topology event from providerId: " | ||
205 | + + "{} between src: {} and dst: {}", | ||
206 | + providerId, linkDescription.src(), linkDescription.dst()); | ||
207 | + try { | ||
208 | + notifyPeers(new InternalLinkEvent(providerId, deltaDesc)); | ||
209 | + } catch (IOException e) { | ||
210 | + log.info("Failed to notify peers of a link update topology event from providerId: " | ||
211 | + + "{} between src: {} and dst: {}", | ||
212 | + providerId, linkDescription.src(), linkDescription.dst()); | ||
213 | + } | ||
214 | + } | ||
215 | + return event; | ||
216 | + } | ||
217 | + | ||
218 | + private LinkEvent createOrUpdateLinkInternal( | ||
219 | + ProviderId providerId, | ||
220 | + Timestamped<LinkDescription> linkDescription) { | ||
221 | + | ||
222 | + LinkKey key = new LinkKey(linkDescription.value().src(), linkDescription.value().dst()); | ||
223 | + ConcurrentMap<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key); | ||
224 | + | ||
225 | + synchronized (descs) { | ||
226 | + // if the link was previously removed, we should proceed if and | ||
227 | + // only if this request is more recent. | ||
228 | + Timestamp linkRemovedTimestamp = removedLinks.get(key); | ||
229 | + if (linkRemovedTimestamp != null) { | ||
230 | + if (linkDescription.isNewer(linkRemovedTimestamp)) { | ||
231 | + removedLinks.remove(key); | ||
232 | + } else { | ||
233 | + return null; | ||
234 | + } | ||
235 | + } | ||
236 | + | ||
237 | + final Link oldLink = links.get(key); | ||
238 | + // update description | ||
239 | + createOrUpdateLinkDescription(descs, providerId, linkDescription); | ||
240 | + final Link newLink = composeLink(descs); | ||
241 | + if (oldLink == null) { | ||
242 | + return createLink(key, newLink); | ||
243 | + } | ||
244 | + return updateLink(key, oldLink, newLink); | ||
245 | + } | ||
246 | + } | ||
247 | + | ||
248 | + // Guarded by linkDescs value (=locking each Link) | ||
249 | + private Timestamped<LinkDescription> createOrUpdateLinkDescription( | ||
250 | + ConcurrentMap<ProviderId, Timestamped<LinkDescription>> existingLinkDescriptions, | ||
251 | + ProviderId providerId, | ||
252 | + Timestamped<LinkDescription> linkDescription) { | ||
253 | + | ||
254 | + // merge existing attributes and merge | ||
255 | + Timestamped<LinkDescription> existingLinkDescription = existingLinkDescriptions.get(providerId); | ||
256 | + if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) { | ||
257 | + return null; | ||
258 | + } | ||
259 | + Timestamped<LinkDescription> newLinkDescription = linkDescription; | ||
260 | + if (existingLinkDescription != null) { | ||
261 | + SparseAnnotations merged = union(existingLinkDescription.value().annotations(), | ||
262 | + linkDescription.value().annotations()); | ||
263 | + newLinkDescription = new Timestamped<LinkDescription>( | ||
264 | + new DefaultLinkDescription( | ||
265 | + linkDescription.value().src(), | ||
266 | + linkDescription.value().dst(), | ||
267 | + linkDescription.value().type(), merged), | ||
268 | + linkDescription.timestamp()); | ||
269 | + } | ||
270 | + return existingLinkDescriptions.put(providerId, newLinkDescription); | ||
271 | + } | ||
272 | + | ||
273 | + // Creates and stores the link and returns the appropriate event. | ||
274 | + // Guarded by linkDescs value (=locking each Link) | ||
275 | + private LinkEvent createLink(LinkKey key, Link newLink) { | ||
276 | + | ||
277 | + if (newLink.providerId().isAncillary()) { | ||
278 | + // TODO: revisit ancillary only Link handling | ||
279 | + | ||
280 | + // currently treating ancillary only as down (not visible outside) | ||
281 | + return null; | ||
282 | + } | ||
283 | + | ||
284 | + links.put(key, newLink); | ||
285 | + srcLinks.put(newLink.src().deviceId(), key); | ||
286 | + dstLinks.put(newLink.dst().deviceId(), key); | ||
287 | + return new LinkEvent(LINK_ADDED, newLink); | ||
288 | + } | ||
289 | + | ||
290 | + // Updates, if necessary the specified link and returns the appropriate event. | ||
291 | + // Guarded by linkDescs value (=locking each Link) | ||
292 | + private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) { | ||
293 | + | ||
294 | + if (newLink.providerId().isAncillary()) { | ||
295 | + // TODO: revisit ancillary only Link handling | ||
296 | + | ||
297 | + // currently treating ancillary only as down (not visible outside) | ||
298 | + return null; | ||
299 | + } | ||
300 | + | ||
301 | + if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) || | ||
302 | + !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) { | ||
303 | + | ||
304 | + links.put(key, newLink); | ||
305 | + // strictly speaking following can be ommitted | ||
306 | + srcLinks.put(oldLink.src().deviceId(), key); | ||
307 | + dstLinks.put(oldLink.dst().deviceId(), key); | ||
308 | + return new LinkEvent(LINK_UPDATED, newLink); | ||
309 | + } | ||
310 | + return null; | ||
311 | + } | ||
312 | + | ||
313 | + @Override | ||
314 | + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { | ||
315 | + final LinkKey key = new LinkKey(src, dst); | ||
316 | + | ||
317 | + DeviceId dstDeviceId = dst.deviceId(); | ||
318 | + Timestamp timestamp = clockService.getTimestamp(dstDeviceId); | ||
319 | + | ||
320 | + LinkEvent event = removeLinkInternal(key, timestamp); | ||
321 | + | ||
322 | + if (event != null) { | ||
323 | + log.info("Notifying peers of a link removed topology event for a link " | ||
324 | + + "between src: {} and dst: {}", src, dst); | ||
325 | + try { | ||
326 | + notifyPeers(new InternalLinkRemovedEvent(key, timestamp)); | ||
327 | + } catch (IOException e) { | ||
328 | + log.error("Failed to notify peers of a link removed topology event for a link " | ||
329 | + + "between src: {} and dst: {}", src, dst); | ||
330 | + } | ||
331 | + } | ||
332 | + return event; | ||
333 | + } | ||
334 | + | ||
335 | + private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) { | ||
336 | + ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions = | ||
337 | + getLinkDescriptions(key); | ||
338 | + synchronized (linkDescriptions) { | ||
339 | + // accept removal request if given timestamp is newer than | ||
340 | + // the latest Timestamp from Primary provider | ||
341 | + ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions); | ||
342 | + if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) { | ||
343 | + return null; | ||
344 | + } | ||
345 | + removedLinks.put(key, timestamp); | ||
346 | + Link link = links.remove(key); | ||
347 | + linkDescriptions.clear(); | ||
348 | + if (link != null) { | ||
349 | + srcLinks.remove(link.src().deviceId(), key); | ||
350 | + dstLinks.remove(link.dst().deviceId(), key); | ||
351 | + return new LinkEvent(LINK_REMOVED, link); | ||
352 | + } | ||
353 | + return null; | ||
354 | + } | ||
355 | + } | ||
356 | + | ||
357 | + private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() { | ||
358 | + return synchronizedSetMultimap(HashMultimap.<K, V>create()); | ||
359 | + } | ||
360 | + | ||
361 | + /** | ||
362 | + * @return primary ProviderID, or randomly chosen one if none exists | ||
363 | + */ | ||
364 | + private ProviderId pickPrimaryProviderId( | ||
365 | + ConcurrentMap<ProviderId, Timestamped<LinkDescription>> providerDescs) { | ||
366 | + | ||
367 | + ProviderId fallBackPrimary = null; | ||
368 | + for (Entry<ProviderId, Timestamped<LinkDescription>> e : providerDescs.entrySet()) { | ||
369 | + if (!e.getKey().isAncillary()) { | ||
370 | + return e.getKey(); | ||
371 | + } else if (fallBackPrimary == null) { | ||
372 | + // pick randomly as a fallback in case there is no primary | ||
373 | + fallBackPrimary = e.getKey(); | ||
374 | + } | ||
375 | + } | ||
376 | + return fallBackPrimary; | ||
377 | + } | ||
378 | + | ||
379 | + private Link composeLink(ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions) { | ||
380 | + ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions); | ||
381 | + Timestamped<LinkDescription> base = linkDescriptions.get(primaryProviderId); | ||
382 | + | ||
383 | + ConnectPoint src = base.value().src(); | ||
384 | + ConnectPoint dst = base.value().dst(); | ||
385 | + Type type = base.value().type(); | ||
386 | + DefaultAnnotations annotations = DefaultAnnotations.builder().build(); | ||
387 | + annotations = merge(annotations, base.value().annotations()); | ||
388 | + | ||
389 | + for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) { | ||
390 | + if (primaryProviderId.equals(e.getKey())) { | ||
391 | + continue; | ||
392 | + } | ||
393 | + | ||
394 | + // TODO: should keep track of Description timestamp | ||
395 | + // and only merge conflicting keys when timestamp is newer | ||
396 | + // Currently assuming there will never be a key conflict between | ||
397 | + // providers | ||
398 | + | ||
399 | + // annotation merging. not so efficient, should revisit later | ||
400 | + annotations = merge(annotations, e.getValue().value().annotations()); | ||
401 | + } | ||
402 | + | ||
403 | + return new DefaultLink(primaryProviderId , src, dst, type, annotations); | ||
404 | + } | ||
405 | + | ||
406 | + private ConcurrentMap<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) { | ||
407 | + return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key, | ||
408 | + NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded()); | ||
409 | + } | ||
410 | + | ||
411 | + private final Function<LinkKey, Link> lookupLink = new LookupLink(); | ||
412 | + private Function<LinkKey, Link> lookupLink() { | ||
413 | + return lookupLink; | ||
414 | + } | ||
415 | + | ||
416 | + private final class LookupLink implements Function<LinkKey, Link> { | ||
417 | + @Override | ||
418 | + public Link apply(LinkKey input) { | ||
419 | + return links.get(input); | ||
420 | + } | ||
421 | + } | ||
422 | + | ||
423 | + private static final Predicate<Provided> IS_PRIMARY = new IsPrimary(); | ||
424 | + private static final Predicate<Provided> isPrimary() { | ||
425 | + return IS_PRIMARY; | ||
426 | + } | ||
427 | + | ||
428 | + private static final class IsPrimary implements Predicate<Provided> { | ||
429 | + | ||
430 | + @Override | ||
431 | + public boolean apply(Provided input) { | ||
432 | + return !input.providerId().isAncillary(); | ||
433 | + } | ||
434 | + } | ||
435 | + | ||
436 | + private void notifyDelegateIfNotNull(LinkEvent event) { | ||
437 | + if (event != null) { | ||
438 | + notifyDelegate(event); | ||
439 | + } | ||
440 | + } | ||
441 | + | ||
442 | + // TODO: should we be throwing exception? | ||
443 | + private void broadcastMessage(MessageSubject subject, Object event) throws IOException { | ||
444 | + ClusterMessage message = new ClusterMessage( | ||
445 | + clusterService.getLocalNode().id(), | ||
446 | + subject, | ||
447 | + SERIALIZER.encode(event)); | ||
448 | + clusterCommunicator.broadcast(message); | ||
449 | + } | ||
450 | + | ||
451 | + private void notifyPeers(InternalLinkEvent event) throws IOException { | ||
452 | + broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event); | ||
453 | + } | ||
454 | + | ||
455 | + private void notifyPeers(InternalLinkRemovedEvent event) throws IOException { | ||
456 | + broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event); | ||
457 | + } | ||
458 | + | ||
459 | + private class InternalLinkEventListener implements ClusterMessageHandler { | ||
460 | + @Override | ||
461 | + public void handle(ClusterMessage message) { | ||
462 | + | ||
463 | + log.info("Received link event from peer: {}", message.sender()); | ||
464 | + InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload()); | ||
465 | + | ||
466 | + ProviderId providerId = event.providerId(); | ||
467 | + Timestamped<LinkDescription> linkDescription = event.linkDescription(); | ||
468 | + | ||
469 | + notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription)); | ||
470 | + } | ||
471 | + } | ||
472 | + | ||
473 | + private class InternalLinkRemovedEventListener implements ClusterMessageHandler { | ||
474 | + @Override | ||
475 | + public void handle(ClusterMessage message) { | ||
476 | + | ||
477 | + log.info("Received link removed event from peer: {}", message.sender()); | ||
478 | + InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload()); | ||
479 | + | ||
480 | + LinkKey linkKey = event.linkKey(); | ||
481 | + Timestamp timestamp = event.timestamp(); | ||
482 | + | ||
483 | + notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp)); | ||
484 | + } | ||
485 | + } | ||
486 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStoreMessageSubjects.java
0 → 100644
1 | +package org.onlab.onos.store.link.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.store.cluster.messaging.MessageSubject; | ||
4 | + | ||
5 | +/** | ||
6 | + * MessageSubjects used by GossipLinkStore peer-peer communication. | ||
7 | + */ | ||
8 | +public final class GossipLinkStoreMessageSubjects { | ||
9 | + | ||
10 | + private GossipLinkStoreMessageSubjects() {} | ||
11 | + | ||
12 | + public static final MessageSubject LINK_UPDATE = new MessageSubject("peer-link-update"); | ||
13 | + public static final MessageSubject LINK_REMOVED = new MessageSubject("peer-link-removed"); | ||
14 | +} |
1 | +package org.onlab.onos.store.link.impl; | ||
2 | + | ||
3 | +import com.google.common.base.MoreObjects; | ||
4 | + | ||
5 | +import org.onlab.onos.net.link.LinkDescription; | ||
6 | +import org.onlab.onos.net.provider.ProviderId; | ||
7 | +import org.onlab.onos.store.common.impl.Timestamped; | ||
8 | + | ||
9 | +/** | ||
10 | + * Information published by GossipDeviceStore to notify peers of a device | ||
11 | + * change event. | ||
12 | + */ | ||
13 | +public class InternalLinkEvent { | ||
14 | + | ||
15 | + private final ProviderId providerId; | ||
16 | + private final Timestamped<LinkDescription> linkDescription; | ||
17 | + | ||
18 | + protected InternalLinkEvent( | ||
19 | + ProviderId providerId, | ||
20 | + Timestamped<LinkDescription> linkDescription) { | ||
21 | + this.providerId = providerId; | ||
22 | + this.linkDescription = linkDescription; | ||
23 | + } | ||
24 | + | ||
25 | + public ProviderId providerId() { | ||
26 | + return providerId; | ||
27 | + } | ||
28 | + | ||
29 | + public Timestamped<LinkDescription> linkDescription() { | ||
30 | + return linkDescription; | ||
31 | + } | ||
32 | + | ||
33 | + @Override | ||
34 | + public String toString() { | ||
35 | + return MoreObjects.toStringHelper(getClass()) | ||
36 | + .add("providerId", providerId) | ||
37 | + .add("linkDescription", linkDescription) | ||
38 | + .toString(); | ||
39 | + } | ||
40 | + | ||
41 | + // for serializer | ||
42 | + protected InternalLinkEvent() { | ||
43 | + this.providerId = null; | ||
44 | + this.linkDescription = null; | ||
45 | + } | ||
46 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/InternalLinkRemovedEvent.java
0 → 100644
1 | +package org.onlab.onos.store.link.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.net.LinkKey; | ||
4 | +import org.onlab.onos.store.Timestamp; | ||
5 | + | ||
6 | +import com.google.common.base.MoreObjects; | ||
7 | + | ||
8 | +/** | ||
9 | + * Information published by GossipLinkStore to notify peers of a link | ||
10 | + * being removed. | ||
11 | + */ | ||
12 | +public class InternalLinkRemovedEvent { | ||
13 | + | ||
14 | + private final LinkKey linkKey; | ||
15 | + private final Timestamp timestamp; | ||
16 | + | ||
17 | + /** | ||
18 | + * Creates a InternalLinkRemovedEvent. | ||
19 | + * @param linkKey identifier of the removed link. | ||
20 | + * @param timestamp timestamp of when the link was removed. | ||
21 | + */ | ||
22 | + public InternalLinkRemovedEvent(LinkKey linkKey, Timestamp timestamp) { | ||
23 | + this.linkKey = linkKey; | ||
24 | + this.timestamp = timestamp; | ||
25 | + } | ||
26 | + | ||
27 | + public LinkKey linkKey() { | ||
28 | + return linkKey; | ||
29 | + } | ||
30 | + | ||
31 | + public Timestamp timestamp() { | ||
32 | + return timestamp; | ||
33 | + } | ||
34 | + | ||
35 | + @Override | ||
36 | + public String toString() { | ||
37 | + return MoreObjects.toStringHelper(getClass()) | ||
38 | + .add("linkKey", linkKey) | ||
39 | + .add("timestamp", timestamp) | ||
40 | + .toString(); | ||
41 | + } | ||
42 | + | ||
43 | + // for serializer | ||
44 | + @SuppressWarnings("unused") | ||
45 | + private InternalLinkRemovedEvent() { | ||
46 | + linkKey = null; | ||
47 | + timestamp = null; | ||
48 | + } | ||
49 | +} | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
deleted
100644 → 0
1 | -package org.onlab.onos.store.link.impl; | ||
2 | - | ||
3 | -import static org.onlab.onos.net.Link.Type.DIRECT; | ||
4 | -import static org.onlab.onos.net.Link.Type.INDIRECT; | ||
5 | -import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED; | ||
6 | -import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED; | ||
7 | -import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED; | ||
8 | -import static org.slf4j.LoggerFactory.getLogger; | ||
9 | - | ||
10 | -import java.util.HashSet; | ||
11 | -import java.util.Set; | ||
12 | -import java.util.concurrent.ConcurrentHashMap; | ||
13 | -import java.util.concurrent.ConcurrentMap; | ||
14 | - | ||
15 | -import org.apache.felix.scr.annotations.Activate; | ||
16 | -import org.apache.felix.scr.annotations.Component; | ||
17 | -import org.apache.felix.scr.annotations.Deactivate; | ||
18 | -import org.apache.felix.scr.annotations.Reference; | ||
19 | -import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
20 | -import org.apache.felix.scr.annotations.Service; | ||
21 | -import org.onlab.onos.net.ConnectPoint; | ||
22 | -import org.onlab.onos.net.DefaultLink; | ||
23 | -import org.onlab.onos.net.DeviceId; | ||
24 | -import org.onlab.onos.net.Link; | ||
25 | -import org.onlab.onos.net.LinkKey; | ||
26 | -import org.onlab.onos.net.link.LinkDescription; | ||
27 | -import org.onlab.onos.net.link.LinkEvent; | ||
28 | -import org.onlab.onos.net.link.LinkStore; | ||
29 | -import org.onlab.onos.net.link.LinkStoreDelegate; | ||
30 | -import org.onlab.onos.net.provider.ProviderId; | ||
31 | -import org.onlab.onos.store.AbstractStore; | ||
32 | -import org.onlab.onos.store.ClockService; | ||
33 | -import org.onlab.onos.store.Timestamp; | ||
34 | -import org.slf4j.Logger; | ||
35 | - | ||
36 | -import com.google.common.collect.HashMultimap; | ||
37 | -import com.google.common.collect.ImmutableSet; | ||
38 | -import com.google.common.collect.Multimap; | ||
39 | -import com.google.common.collect.ImmutableSet.Builder; | ||
40 | - | ||
41 | -import static com.google.common.base.Preconditions.checkArgument; | ||
42 | -import static com.google.common.base.Preconditions.checkState; | ||
43 | - | ||
44 | -//TODO: Add support for multiple provider and annotations | ||
45 | -/** | ||
46 | - * Manages inventory of infrastructure links using a protocol that takes into consideration | ||
47 | - * the order in which events occur. | ||
48 | - */ | ||
49 | -// FIXME: This does not yet implement the full protocol. | ||
50 | -// The full protocol requires the sender of LLDP message to include the | ||
51 | -// version information of src device/port and the receiver to | ||
52 | -// take that into account when figuring out if a more recent src | ||
53 | -// device/port down event renders the link discovery obsolete. | ||
54 | -@Component(immediate = true) | ||
55 | -@Service | ||
56 | -public class OnosDistributedLinkStore | ||
57 | - extends AbstractStore<LinkEvent, LinkStoreDelegate> | ||
58 | - implements LinkStore { | ||
59 | - | ||
60 | - private final Logger log = getLogger(getClass()); | ||
61 | - | ||
62 | - // Link inventory | ||
63 | - private ConcurrentMap<LinkKey, VersionedValue<Link>> links; | ||
64 | - | ||
65 | - public static final String LINK_NOT_FOUND = "Link between %s and %s not found"; | ||
66 | - | ||
67 | - // TODO synchronize? | ||
68 | - // Egress and ingress link sets | ||
69 | - private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create(); | ||
70 | - private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create(); | ||
71 | - | ||
72 | - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
73 | - protected ClockService clockService; | ||
74 | - | ||
75 | - @Activate | ||
76 | - public void activate() { | ||
77 | - | ||
78 | - links = new ConcurrentHashMap<>(); | ||
79 | - | ||
80 | - log.info("Started"); | ||
81 | - } | ||
82 | - | ||
83 | - @Deactivate | ||
84 | - public void deactivate() { | ||
85 | - log.info("Stopped"); | ||
86 | - } | ||
87 | - | ||
88 | - @Override | ||
89 | - public int getLinkCount() { | ||
90 | - return links.size(); | ||
91 | - } | ||
92 | - | ||
93 | - @Override | ||
94 | - public Iterable<Link> getLinks() { | ||
95 | - Builder<Link> builder = ImmutableSet.builder(); | ||
96 | - synchronized (this) { | ||
97 | - for (VersionedValue<Link> link : links.values()) { | ||
98 | - builder.add(link.entity()); | ||
99 | - } | ||
100 | - return builder.build(); | ||
101 | - } | ||
102 | - } | ||
103 | - | ||
104 | - @Override | ||
105 | - public Set<Link> getDeviceEgressLinks(DeviceId deviceId) { | ||
106 | - Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId)); | ||
107 | - Set<Link> rawEgressLinks = new HashSet<>(); | ||
108 | - for (VersionedValue<Link> link : egressLinks) { | ||
109 | - rawEgressLinks.add(link.entity()); | ||
110 | - } | ||
111 | - return rawEgressLinks; | ||
112 | - } | ||
113 | - | ||
114 | - @Override | ||
115 | - public Set<Link> getDeviceIngressLinks(DeviceId deviceId) { | ||
116 | - Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId)); | ||
117 | - Set<Link> rawIngressLinks = new HashSet<>(); | ||
118 | - for (VersionedValue<Link> link : ingressLinks) { | ||
119 | - rawIngressLinks.add(link.entity()); | ||
120 | - } | ||
121 | - return rawIngressLinks; | ||
122 | - } | ||
123 | - | ||
124 | - @Override | ||
125 | - public Link getLink(ConnectPoint src, ConnectPoint dst) { | ||
126 | - VersionedValue<Link> link = links.get(new LinkKey(src, dst)); | ||
127 | - checkArgument(link != null, "LINK_NOT_FOUND", src, dst); | ||
128 | - return link.entity(); | ||
129 | - } | ||
130 | - | ||
131 | - @Override | ||
132 | - public Set<Link> getEgressLinks(ConnectPoint src) { | ||
133 | - Set<Link> egressLinks = new HashSet<>(); | ||
134 | - for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) { | ||
135 | - if (link.entity().src().equals(src)) { | ||
136 | - egressLinks.add(link.entity()); | ||
137 | - } | ||
138 | - } | ||
139 | - return egressLinks; | ||
140 | - } | ||
141 | - | ||
142 | - @Override | ||
143 | - public Set<Link> getIngressLinks(ConnectPoint dst) { | ||
144 | - Set<Link> ingressLinks = new HashSet<>(); | ||
145 | - for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) { | ||
146 | - if (link.entity().dst().equals(dst)) { | ||
147 | - ingressLinks.add(link.entity()); | ||
148 | - } | ||
149 | - } | ||
150 | - return ingressLinks; | ||
151 | - } | ||
152 | - | ||
153 | - @Override | ||
154 | - public LinkEvent createOrUpdateLink(ProviderId providerId, | ||
155 | - LinkDescription linkDescription) { | ||
156 | - | ||
157 | - final DeviceId destinationDeviceId = linkDescription.dst().deviceId(); | ||
158 | - final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId); | ||
159 | - | ||
160 | - LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst()); | ||
161 | - VersionedValue<Link> link = links.get(key); | ||
162 | - if (link == null) { | ||
163 | - return createLink(providerId, key, linkDescription, newTimestamp); | ||
164 | - } | ||
165 | - | ||
166 | - checkState(newTimestamp.compareTo(link.timestamp()) > 0, | ||
167 | - "Existing Link has a timestamp in the future!"); | ||
168 | - | ||
169 | - return updateLink(providerId, link, key, linkDescription, newTimestamp); | ||
170 | - } | ||
171 | - | ||
172 | - // Creates and stores the link and returns the appropriate event. | ||
173 | - private LinkEvent createLink(ProviderId providerId, LinkKey key, | ||
174 | - LinkDescription linkDescription, Timestamp timestamp) { | ||
175 | - VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(), | ||
176 | - linkDescription.type()), true, timestamp); | ||
177 | - synchronized (this) { | ||
178 | - links.put(key, link); | ||
179 | - addNewLink(link, timestamp); | ||
180 | - } | ||
181 | - // FIXME: notify peers. | ||
182 | - return new LinkEvent(LINK_ADDED, link.entity()); | ||
183 | - } | ||
184 | - | ||
185 | - // update Egress and ingress link sets | ||
186 | - private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) { | ||
187 | - Link rawLink = link.entity(); | ||
188 | - synchronized (this) { | ||
189 | - srcLinks.put(rawLink.src().deviceId(), link); | ||
190 | - dstLinks.put(rawLink.dst().deviceId(), link); | ||
191 | - } | ||
192 | - } | ||
193 | - | ||
194 | - // Updates, if necessary the specified link and returns the appropriate event. | ||
195 | - private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink, | ||
196 | - LinkKey key, LinkDescription linkDescription, Timestamp timestamp) { | ||
197 | - // FIXME confirm Link update condition is OK | ||
198 | - if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) { | ||
199 | - synchronized (this) { | ||
200 | - | ||
201 | - VersionedValue<Link> updatedLink = new VersionedValue<Link>( | ||
202 | - new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(), | ||
203 | - linkDescription.type()), true, timestamp); | ||
204 | - links.replace(key, existingLink, updatedLink); | ||
205 | - | ||
206 | - replaceLink(existingLink, updatedLink); | ||
207 | - // FIXME: notify peers. | ||
208 | - return new LinkEvent(LINK_UPDATED, updatedLink.entity()); | ||
209 | - } | ||
210 | - } | ||
211 | - return null; | ||
212 | - } | ||
213 | - | ||
214 | - // update Egress and ingress link sets | ||
215 | - private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) { | ||
216 | - synchronized (this) { | ||
217 | - srcLinks.remove(current.entity().src().deviceId(), current); | ||
218 | - dstLinks.remove(current.entity().dst().deviceId(), current); | ||
219 | - | ||
220 | - srcLinks.put(current.entity().src().deviceId(), updated); | ||
221 | - dstLinks.put(current.entity().dst().deviceId(), updated); | ||
222 | - } | ||
223 | - } | ||
224 | - | ||
225 | - @Override | ||
226 | - public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { | ||
227 | - synchronized (this) { | ||
228 | - LinkKey key = new LinkKey(src, dst); | ||
229 | - VersionedValue<Link> link = links.remove(key); | ||
230 | - if (link != null) { | ||
231 | - removeLink(link); | ||
232 | - // notify peers | ||
233 | - return new LinkEvent(LINK_REMOVED, link.entity()); | ||
234 | - } | ||
235 | - return null; | ||
236 | - } | ||
237 | - } | ||
238 | - | ||
239 | - // update Egress and ingress link sets | ||
240 | - private void removeLink(VersionedValue<Link> link) { | ||
241 | - synchronized (this) { | ||
242 | - srcLinks.remove(link.entity().src().deviceId(), link); | ||
243 | - dstLinks.remove(link.entity().dst().deviceId(), link); | ||
244 | - } | ||
245 | - } | ||
246 | -} |
1 | -package org.onlab.onos.store.link.impl; | ||
2 | - | ||
3 | -import java.util.Objects; | ||
4 | - | ||
5 | -import org.onlab.onos.store.Timestamp; | ||
6 | - | ||
7 | -// TODO: remove once we stop using this | ||
8 | -/** | ||
9 | - * Wrapper class for a entity that is versioned | ||
10 | - * and can either be up or down. | ||
11 | - * | ||
12 | - * @param <T> type of the value. | ||
13 | - */ | ||
14 | -public class VersionedValue<T> { | ||
15 | - private final T entity; | ||
16 | - private final Timestamp timestamp; | ||
17 | - private final boolean isUp; | ||
18 | - | ||
19 | - public VersionedValue(T entity, boolean isUp, Timestamp timestamp) { | ||
20 | - this.entity = entity; | ||
21 | - this.isUp = isUp; | ||
22 | - this.timestamp = timestamp; | ||
23 | - } | ||
24 | - | ||
25 | - /** | ||
26 | - * Returns the value. | ||
27 | - * @return value. | ||
28 | - */ | ||
29 | - public T entity() { | ||
30 | - return entity; | ||
31 | - } | ||
32 | - | ||
33 | - /** | ||
34 | - * Tells whether the entity is up or down. | ||
35 | - * @return true if up, false otherwise. | ||
36 | - */ | ||
37 | - public boolean isUp() { | ||
38 | - return isUp; | ||
39 | - } | ||
40 | - | ||
41 | - /** | ||
42 | - * Returns the timestamp (version) associated with this entity. | ||
43 | - * @return timestamp. | ||
44 | - */ | ||
45 | - public Timestamp timestamp() { | ||
46 | - return timestamp; | ||
47 | - } | ||
48 | - | ||
49 | - | ||
50 | - @Override | ||
51 | - public int hashCode() { | ||
52 | - return Objects.hash(entity, timestamp, isUp); | ||
53 | - } | ||
54 | - | ||
55 | - @Override | ||
56 | - public boolean equals(Object obj) { | ||
57 | - if (this == obj) { | ||
58 | - return true; | ||
59 | - } | ||
60 | - if (obj == null) { | ||
61 | - return false; | ||
62 | - } | ||
63 | - if (getClass() != obj.getClass()) { | ||
64 | - return false; | ||
65 | - } | ||
66 | - @SuppressWarnings("unchecked") | ||
67 | - VersionedValue<T> that = (VersionedValue<T>) obj; | ||
68 | - return Objects.equals(this.entity, that.entity) && | ||
69 | - Objects.equals(this.timestamp, that.timestamp) && | ||
70 | - Objects.equals(this.isUp, that.isUp); | ||
71 | - } | ||
72 | - | ||
73 | - // Default constructor for serializer | ||
74 | - protected VersionedValue() { | ||
75 | - this.entity = null; | ||
76 | - this.isUp = false; | ||
77 | - this.timestamp = null; | ||
78 | - } | ||
79 | -} |
... | @@ -24,6 +24,7 @@ import org.onlab.onos.net.Port; | ... | @@ -24,6 +24,7 @@ import org.onlab.onos.net.Port; |
24 | import org.onlab.onos.net.PortNumber; | 24 | import org.onlab.onos.net.PortNumber; |
25 | import org.onlab.onos.net.device.DefaultDeviceDescription; | 25 | import org.onlab.onos.net.device.DefaultDeviceDescription; |
26 | import org.onlab.onos.net.device.DefaultPortDescription; | 26 | import org.onlab.onos.net.device.DefaultPortDescription; |
27 | +import org.onlab.onos.net.link.DefaultLinkDescription; | ||
27 | import org.onlab.onos.net.provider.ProviderId; | 28 | import org.onlab.onos.net.provider.ProviderId; |
28 | import org.onlab.onos.store.Timestamp; | 29 | import org.onlab.onos.store.Timestamp; |
29 | import org.onlab.packet.IpAddress; | 30 | import org.onlab.packet.IpAddress; |
... | @@ -58,6 +59,7 @@ public final class KryoPoolUtil { | ... | @@ -58,6 +59,7 @@ public final class KryoPoolUtil { |
58 | DefaultControllerNode.class, | 59 | DefaultControllerNode.class, |
59 | DefaultDevice.class, | 60 | DefaultDevice.class, |
60 | DefaultDeviceDescription.class, | 61 | DefaultDeviceDescription.class, |
62 | + DefaultLinkDescription.class, | ||
61 | MastershipRole.class, | 63 | MastershipRole.class, |
62 | Port.class, | 64 | Port.class, |
63 | DefaultPortDescription.class, | 65 | DefaultPortDescription.class, | ... | ... |
-
Please register or login to post a comment