alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 19 changed files with 592 additions and 38 deletions
...@@ -56,6 +56,13 @@ public interface MastershipService { ...@@ -56,6 +56,13 @@ public interface MastershipService {
56 Set<DeviceId> getDevicesOf(NodeId nodeId); 56 Set<DeviceId> getDevicesOf(NodeId nodeId);
57 57
58 /** 58 /**
59 + * Returns the mastership term service for getting term information.
60 + *
61 + * @return the MastershipTermService for this mastership manager
62 + */
63 + MastershipTermService requestTermService();
64 +
65 + /**
59 * Adds the specified mastership change listener. 66 * Adds the specified mastership change listener.
60 * 67 *
61 * @param listener the mastership listener 68 * @param listener the mastership listener
......
...@@ -55,4 +55,13 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD ...@@ -55,4 +55,13 @@ public interface MastershipStore extends Store<MastershipEvent, MastershipStoreD
55 * @return a mastership event 55 * @return a mastership event
56 */ 56 */
57 MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId); 57 MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId);
58 +
59 + /**
60 + * Returns the current master and number of past mastership hand-offs
61 + * (terms) for a device.
62 + *
63 + * @param deviceId the device identifier
64 + * @return the current master's ID and the term value for device, or null
65 + */
66 + MastershipTerm getTermFor(DeviceId deviceId);
58 } 67 }
......
1 package org.onlab.onos.cluster; 1 package org.onlab.onos.cluster;
2 2
3 -public class MastershipTerm { 3 +import java.util.Objects;
4 - private final NodeId master = null; 4 +
5 +public final class MastershipTerm {
6 +
7 + private final NodeId master;
5 private int termNumber; 8 private int termNumber;
9 +
10 + private MastershipTerm(NodeId master, int term) {
11 + this.master = master;
12 + this.termNumber = term;
13 + }
14 +
15 + public static MastershipTerm of(NodeId master, int term) {
16 + return new MastershipTerm(master, term);
17 + }
18 +
19 + public NodeId master() {
20 + return master;
21 + }
22 +
23 + public int termNumber() {
24 + return termNumber;
25 + }
26 +
27 + @Override
28 + public int hashCode() {
29 + return Objects.hash(master, termNumber);
30 + }
31 +
32 + @Override
33 + public boolean equals(Object other) {
34 + if (this == other) {
35 + return true;
36 + }
37 + if (other instanceof MastershipTerm) {
38 + MastershipTerm that = (MastershipTerm) other;
39 + if (!this.master.equals(that.master)) {
40 + return false;
41 + }
42 + if (this.termNumber != that.termNumber) {
43 + return false;
44 + }
45 + return true;
46 + }
47 + return false;
48 + }
6 } 49 }
......
1 +package org.onlab.onos.net;
2 +
3 +import java.util.Objects;
4 +
5 +import com.google.common.base.MoreObjects;
6 +
7 +// TODO Consider renaming.
8 +// it's an identifier for a Link, but it's not ElementId, so not using LinkId.
9 +/**
10 + * Immutable representation of a link identity.
11 + */
12 +public class LinkKey {
13 +
14 + private final ConnectPoint src;
15 + private final ConnectPoint dst;
16 +
17 + /**
18 + * Returns source connection point.
19 + *
20 + * @return source connection point
21 + */
22 + public ConnectPoint src() {
23 + return src;
24 + }
25 +
26 + /**
27 + * Returns destination connection point.
28 + *
29 + * @return destination connection point
30 + */
31 + public ConnectPoint dst() {
32 + return dst;
33 + }
34 +
35 + /**
36 + * Creates a link identifier with source and destination connection point.
37 + *
38 + * @param src source connection point
39 + * @param dst destination connection point
40 + */
41 + public LinkKey(ConnectPoint src, ConnectPoint dst) {
42 + this.src = src;
43 + this.dst = dst;
44 + }
45 +
46 + @Override
47 + public int hashCode() {
48 + return Objects.hash(src(), dst);
49 + }
50 +
51 + @Override
52 + public boolean equals(Object obj) {
53 + if (this == obj) {
54 + return true;
55 + }
56 + if (obj instanceof LinkKey) {
57 + final LinkKey other = (LinkKey) obj;
58 + return Objects.equals(this.src(), other.src()) &&
59 + Objects.equals(this.dst, other.dst);
60 + }
61 + return false;
62 + }
63 +
64 + @Override
65 + public String toString() {
66 + return MoreObjects.toStringHelper(getClass())
67 + .add("src", src())
68 + .add("dst", dst)
69 + .toString();
70 + }
71 +}
...@@ -40,4 +40,9 @@ public class MastershipServiceAdapter implements MastershipService { ...@@ -40,4 +40,9 @@ public class MastershipServiceAdapter implements MastershipService {
40 @Override 40 @Override
41 public void removeListener(MastershipListener listener) { 41 public void removeListener(MastershipListener listener) {
42 } 42 }
43 +
44 + @Override
45 + public MastershipTermService requestTermService() {
46 + return null;
47 + }
43 } 48 }
......
...@@ -12,6 +12,8 @@ import org.onlab.onos.cluster.MastershipEvent; ...@@ -12,6 +12,8 @@ import org.onlab.onos.cluster.MastershipEvent;
12 import org.onlab.onos.cluster.MastershipListener; 12 import org.onlab.onos.cluster.MastershipListener;
13 import org.onlab.onos.cluster.MastershipService; 13 import org.onlab.onos.cluster.MastershipService;
14 import org.onlab.onos.cluster.MastershipStore; 14 import org.onlab.onos.cluster.MastershipStore;
15 +import org.onlab.onos.cluster.MastershipTerm;
16 +import org.onlab.onos.cluster.MastershipTermService;
15 import org.onlab.onos.cluster.NodeId; 17 import org.onlab.onos.cluster.NodeId;
16 import org.onlab.onos.event.AbstractListenerRegistry; 18 import org.onlab.onos.event.AbstractListenerRegistry;
17 import org.onlab.onos.event.EventDeliveryService; 19 import org.onlab.onos.event.EventDeliveryService;
...@@ -103,6 +105,12 @@ public class MastershipManager ...@@ -103,6 +105,12 @@ public class MastershipManager
103 return store.getDevices(nodeId); 105 return store.getDevices(nodeId);
104 } 106 }
105 107
108 +
109 + @Override
110 + public MastershipTermService requestTermService() {
111 + return new InternalMastershipTermService();
112 + }
113 +
106 @Override 114 @Override
107 public void addListener(MastershipListener listener) { 115 public void addListener(MastershipListener listener) {
108 checkNotNull(listener); 116 checkNotNull(listener);
...@@ -124,4 +132,13 @@ public class MastershipManager ...@@ -124,4 +132,13 @@ public class MastershipManager
124 } 132 }
125 } 133 }
126 134
135 + private class InternalMastershipTermService implements MastershipTermService {
136 +
137 + @Override
138 + public MastershipTerm getMastershipTerm(DeviceId deviceId) {
139 + return store.getTermFor(deviceId);
140 + }
141 +
142 + }
143 +
127 } 144 }
......
...@@ -11,6 +11,7 @@ import org.onlab.onos.cluster.ControllerNode; ...@@ -11,6 +11,7 @@ import org.onlab.onos.cluster.ControllerNode;
11 import org.onlab.onos.cluster.ControllerNode.State; 11 import org.onlab.onos.cluster.ControllerNode.State;
12 import org.onlab.onos.cluster.DefaultControllerNode; 12 import org.onlab.onos.cluster.DefaultControllerNode;
13 import org.onlab.onos.cluster.MastershipService; 13 import org.onlab.onos.cluster.MastershipService;
14 +import org.onlab.onos.cluster.MastershipTermService;
14 import org.onlab.onos.cluster.NodeId; 15 import org.onlab.onos.cluster.NodeId;
15 import org.onlab.onos.event.impl.TestEventDispatcher; 16 import org.onlab.onos.event.impl.TestEventDispatcher;
16 import org.onlab.onos.net.DeviceId; 17 import org.onlab.onos.net.DeviceId;
...@@ -100,6 +101,20 @@ public class MastershipManagerTest { ...@@ -100,6 +101,20 @@ public class MastershipManagerTest {
100 assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size()); 101 assertEquals("should be two devices:", 2, mgr.getDevicesOf(NID_LOCAL).size());
101 } 102 }
102 103
104 + @Test
105 + public void termService() {
106 + MastershipTermService ts = mgr.requestTermService();
107 +
108 + //term = 0 for both
109 + mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
110 + assertEquals("inconsistent term: ", 0, ts.getMastershipTerm(DEV_MASTER).termNumber());
111 +
112 + //hand devices to NID_LOCAL and back: term = 2
113 + mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
114 + mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
115 + assertEquals("inconsistent terms: ", 2, ts.getMastershipTerm(DEV_MASTER).termNumber());
116 + }
117 +
103 private final class TestClusterService implements ClusterService { 118 private final class TestClusterService implements ClusterService {
104 119
105 ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST); 120 ControllerNode local = new DefaultControllerNode(NID_LOCAL, LOCALHOST);
......
...@@ -15,6 +15,7 @@ import org.onlab.onos.cluster.ClusterService; ...@@ -15,6 +15,7 @@ import org.onlab.onos.cluster.ClusterService;
15 import org.onlab.onos.cluster.MastershipEvent; 15 import org.onlab.onos.cluster.MastershipEvent;
16 import org.onlab.onos.cluster.MastershipStore; 16 import org.onlab.onos.cluster.MastershipStore;
17 import org.onlab.onos.cluster.MastershipStoreDelegate; 17 import org.onlab.onos.cluster.MastershipStoreDelegate;
18 +import org.onlab.onos.cluster.MastershipTerm;
18 import org.onlab.onos.cluster.NodeId; 19 import org.onlab.onos.cluster.NodeId;
19 import org.onlab.onos.net.DeviceId; 20 import org.onlab.onos.net.DeviceId;
20 import org.onlab.onos.net.MastershipRole; 21 import org.onlab.onos.net.MastershipRole;
...@@ -115,4 +116,10 @@ public class DistributedMastershipStore ...@@ -115,4 +116,10 @@ public class DistributedMastershipStore
115 return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY; 116 return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
116 } 117 }
117 118
119 + @Override
120 + public MastershipTerm getTermFor(DeviceId deviceId) {
121 + // TODO Auto-generated method stub
122 + return null;
123 + }
124 +
118 } 125 }
......
...@@ -369,7 +369,7 @@ public class DistributedDeviceStore ...@@ -369,7 +369,7 @@ public class DistributedDeviceStore
369 } 369 }
370 370
371 @Override 371 @Override
372 - protected void onUpdate(DeviceId deviceId, DefaultDevice device) { 372 + protected void onUpdate(DeviceId deviceId, DefaultDevice oldDevice, DefaultDevice device) {
373 notifyDelegate(new DeviceEvent(DEVICE_UPDATED, device)); 373 notifyDelegate(new DeviceEvent(DEVICE_UPDATED, device));
374 } 374 }
375 } 375 }
...@@ -390,7 +390,7 @@ public class DistributedDeviceStore ...@@ -390,7 +390,7 @@ public class DistributedDeviceStore
390 } 390 }
391 391
392 @Override 392 @Override
393 - protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> ports) { 393 + protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> oldPorts, Map<PortNumber, Port> ports) {
394 // notifyDelegate(new DeviceEvent(PORT_UPDATED, getDevice(deviceId))); 394 // notifyDelegate(new DeviceEvent(PORT_UPDATED, getDevice(deviceId)));
395 } 395 }
396 } 396 }
......
...@@ -101,7 +101,7 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD ...@@ -101,7 +101,7 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
101 V newVal = deserialize(event.getValue()); 101 V newVal = deserialize(event.getValue());
102 Optional<V> newValue = Optional.of(newVal); 102 Optional<V> newValue = Optional.of(newVal);
103 cache.asMap().replace(key, oldValue, newValue); 103 cache.asMap().replace(key, oldValue, newValue);
104 - onUpdate(key, newVal); 104 + onUpdate(key, oldVal, newVal);
105 } 105 }
106 106
107 @Override 107 @Override
...@@ -125,9 +125,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD ...@@ -125,9 +125,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
125 * Cache entry update hook. 125 * Cache entry update hook.
126 * 126 *
127 * @param key new key 127 * @param key new key
128 + * @param oldValue old value
128 * @param newVal new value 129 * @param newVal new value
129 */ 130 */
130 - protected void onUpdate(K key, V newVal) { 131 + protected void onUpdate(K key, V oldValue, V newVal) {
131 } 132 }
132 133
133 /** 134 /**
......
...@@ -14,19 +14,26 @@ import org.apache.felix.scr.annotations.Service; ...@@ -14,19 +14,26 @@ import org.apache.felix.scr.annotations.Service;
14 import org.onlab.onos.cluster.ControllerNode; 14 import org.onlab.onos.cluster.ControllerNode;
15 import org.onlab.onos.cluster.DefaultControllerNode; 15 import org.onlab.onos.cluster.DefaultControllerNode;
16 import org.onlab.onos.cluster.NodeId; 16 import org.onlab.onos.cluster.NodeId;
17 +import org.onlab.onos.net.ConnectPoint;
17 import org.onlab.onos.net.DefaultDevice; 18 import org.onlab.onos.net.DefaultDevice;
19 +import org.onlab.onos.net.DefaultLink;
18 import org.onlab.onos.net.DefaultPort; 20 import org.onlab.onos.net.DefaultPort;
19 import org.onlab.onos.net.Device; 21 import org.onlab.onos.net.Device;
20 import org.onlab.onos.net.DeviceId; 22 import org.onlab.onos.net.DeviceId;
21 import org.onlab.onos.net.Element; 23 import org.onlab.onos.net.Element;
24 +import org.onlab.onos.net.Link;
25 +import org.onlab.onos.net.LinkKey;
22 import org.onlab.onos.net.MastershipRole; 26 import org.onlab.onos.net.MastershipRole;
23 import org.onlab.onos.net.Port; 27 import org.onlab.onos.net.Port;
24 import org.onlab.onos.net.PortNumber; 28 import org.onlab.onos.net.PortNumber;
25 import org.onlab.onos.net.provider.ProviderId; 29 import org.onlab.onos.net.provider.ProviderId;
26 import org.onlab.onos.store.common.StoreService; 30 import org.onlab.onos.store.common.StoreService;
31 +import org.onlab.onos.store.serializers.ConnectPointSerializer;
32 +import org.onlab.onos.store.serializers.DefaultLinkSerializer;
27 import org.onlab.onos.store.serializers.DefaultPortSerializer; 33 import org.onlab.onos.store.serializers.DefaultPortSerializer;
28 import org.onlab.onos.store.serializers.DeviceIdSerializer; 34 import org.onlab.onos.store.serializers.DeviceIdSerializer;
29 import org.onlab.onos.store.serializers.IpPrefixSerializer; 35 import org.onlab.onos.store.serializers.IpPrefixSerializer;
36 +import org.onlab.onos.store.serializers.LinkKeySerializer;
30 import org.onlab.onos.store.serializers.NodeIdSerializer; 37 import org.onlab.onos.store.serializers.NodeIdSerializer;
31 import org.onlab.onos.store.serializers.OnosTimestampSerializer; 38 import org.onlab.onos.store.serializers.OnosTimestampSerializer;
32 import org.onlab.onos.store.serializers.PortNumberSerializer; 39 import org.onlab.onos.store.serializers.PortNumberSerializer;
...@@ -84,7 +91,9 @@ public class StoreManager implements StoreService { ...@@ -84,7 +91,9 @@ public class StoreManager implements StoreService {
84 DefaultDevice.class, 91 DefaultDevice.class,
85 MastershipRole.class, 92 MastershipRole.class,
86 Port.class, 93 Port.class,
87 - Element.class 94 + Element.class,
95 +
96 + Link.Type.class
88 ) 97 )
89 .register(IpPrefix.class, new IpPrefixSerializer()) 98 .register(IpPrefix.class, new IpPrefixSerializer())
90 .register(URI.class, new URISerializer()) 99 .register(URI.class, new URISerializer())
...@@ -94,6 +103,9 @@ public class StoreManager implements StoreService { ...@@ -94,6 +103,9 @@ public class StoreManager implements StoreService {
94 .register(PortNumber.class, new PortNumberSerializer()) 103 .register(PortNumber.class, new PortNumberSerializer())
95 .register(DefaultPort.class, new DefaultPortSerializer()) 104 .register(DefaultPort.class, new DefaultPortSerializer())
96 .register(OnosTimestamp.class, new OnosTimestampSerializer()) 105 .register(OnosTimestamp.class, new OnosTimestampSerializer())
106 + .register(LinkKey.class, new LinkKeySerializer())
107 + .register(ConnectPoint.class, new ConnectPointSerializer())
108 + .register(DefaultLink.class, new DefaultLinkSerializer())
97 .build() 109 .build()
98 .populate(10); 110 .populate(10);
99 } 111 }
......
1 +package org.onlab.onos.store.link.impl;
2 +
3 +import static com.google.common.cache.CacheBuilder.newBuilder;
4 +import static org.onlab.onos.net.Link.Type.DIRECT;
5 +import static org.onlab.onos.net.Link.Type.INDIRECT;
6 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
7 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
8 +import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
9 +import static org.slf4j.LoggerFactory.getLogger;
10 +
11 +import java.util.HashSet;
12 +import java.util.Set;
13 +import org.apache.felix.scr.annotations.Activate;
14 +import org.apache.felix.scr.annotations.Component;
15 +import org.apache.felix.scr.annotations.Deactivate;
16 +import org.apache.felix.scr.annotations.Service;
17 +import org.onlab.onos.net.ConnectPoint;
18 +import org.onlab.onos.net.DefaultLink;
19 +import org.onlab.onos.net.DeviceId;
20 +import org.onlab.onos.net.Link;
21 +import org.onlab.onos.net.LinkKey;
22 +import org.onlab.onos.net.link.LinkDescription;
23 +import org.onlab.onos.net.link.LinkEvent;
24 +import org.onlab.onos.net.link.LinkStore;
25 +import org.onlab.onos.net.link.LinkStoreDelegate;
26 +import org.onlab.onos.net.provider.ProviderId;
27 +import org.onlab.onos.store.impl.AbsentInvalidatingLoadingCache;
28 +import org.onlab.onos.store.impl.AbstractDistributedStore;
29 +import org.onlab.onos.store.impl.OptionalCacheLoader;
30 +import org.slf4j.Logger;
31 +
32 +import com.google.common.base.Optional;
33 +import com.google.common.cache.LoadingCache;
34 +import com.google.common.collect.HashMultimap;
35 +import com.google.common.collect.ImmutableSet;
36 +import com.google.common.collect.Multimap;
37 +import com.google.common.collect.ImmutableSet.Builder;
38 +import com.hazelcast.core.IMap;
39 +
40 +/**
41 + * Manages inventory of infrastructure links using Hazelcast-backed map.
42 + */
43 +@Component(immediate = true)
44 +@Service
45 +public class DistributedLinkStore
46 + extends AbstractDistributedStore<LinkEvent, LinkStoreDelegate>
47 + implements LinkStore {
48 +
49 + private final Logger log = getLogger(getClass());
50 +
51 + // Link inventory
52 + private IMap<byte[], byte[]> rawLinks;
53 + private LoadingCache<LinkKey, Optional<DefaultLink>> links;
54 +
55 + // TODO synchronize?
56 + // Egress and ingress link sets
57 + private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
58 + private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
59 +
60 + @Override
61 + @Activate
62 + public void activate() {
63 + super.activate();
64 +
65 + boolean includeValue = true;
66 +
67 + // TODO decide on Map name scheme to avoid collision
68 + rawLinks = theInstance.getMap("links");
69 + final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
70 + = new OptionalCacheLoader<>(storeService, rawLinks);
71 + links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
72 + // refresh/populate cache based on notification from other instance
73 + rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
74 +
75 + loadLinkCache();
76 +
77 + log.info("Started");
78 + }
79 +
80 + @Deactivate
81 + public void deactivate() {
82 + super.activate();
83 + log.info("Stopped");
84 + }
85 +
86 + private void loadLinkCache() {
87 + for (byte[] keyBytes : rawLinks.keySet()) {
88 + final LinkKey id = deserialize(keyBytes);
89 + links.refresh(id);
90 + }
91 + }
92 +
93 + @Override
94 + public int getLinkCount() {
95 + return links.asMap().size();
96 + }
97 +
98 + @Override
99 + public Iterable<Link> getLinks() {
100 + Builder<Link> builder = ImmutableSet.builder();
101 + for (Optional<DefaultLink> e : links.asMap().values()) {
102 + if (e.isPresent()) {
103 + builder.add(e.get());
104 + }
105 + }
106 + return builder.build();
107 + }
108 +
109 + @Override
110 + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
111 + return ImmutableSet.copyOf(srcLinks.get(deviceId));
112 + }
113 +
114 + @Override
115 + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
116 + return ImmutableSet.copyOf(dstLinks.get(deviceId));
117 + }
118 +
119 + @Override
120 + public Link getLink(ConnectPoint src, ConnectPoint dst) {
121 + return links.getUnchecked(new LinkKey(src, dst)).orNull();
122 + }
123 +
124 + @Override
125 + public Set<Link> getEgressLinks(ConnectPoint src) {
126 + Set<Link> egress = new HashSet<>();
127 + for (Link link : srcLinks.get(src.deviceId())) {
128 + if (link.src().equals(src)) {
129 + egress.add(link);
130 + }
131 + }
132 + return egress;
133 + }
134 +
135 + @Override
136 + public Set<Link> getIngressLinks(ConnectPoint dst) {
137 + Set<Link> ingress = new HashSet<>();
138 + for (Link link : dstLinks.get(dst.deviceId())) {
139 + if (link.dst().equals(dst)) {
140 + ingress.add(link);
141 + }
142 + }
143 + return ingress;
144 + }
145 +
146 + @Override
147 + public LinkEvent createOrUpdateLink(ProviderId providerId,
148 + LinkDescription linkDescription) {
149 + LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
150 + Optional<DefaultLink> link = links.getUnchecked(key);
151 + if (!link.isPresent()) {
152 + return createLink(providerId, key, linkDescription);
153 + }
154 + return updateLink(providerId, link.get(), key, linkDescription);
155 + }
156 +
157 + // Creates and stores the link and returns the appropriate event.
158 + private LinkEvent createLink(ProviderId providerId, LinkKey key,
159 + LinkDescription linkDescription) {
160 + DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
161 + linkDescription.type());
162 + synchronized (this) {
163 + final byte[] keyBytes = serialize(key);
164 + rawLinks.put(keyBytes, serialize(link));
165 + links.asMap().putIfAbsent(key, Optional.of(link));
166 +
167 + addNewLink(link);
168 + }
169 + return new LinkEvent(LINK_ADDED, link);
170 + }
171 +
172 + // update Egress and ingress link sets
173 + private void addNewLink(DefaultLink link) {
174 + synchronized (this) {
175 + srcLinks.put(link.src().deviceId(), link);
176 + dstLinks.put(link.dst().deviceId(), link);
177 + }
178 + }
179 +
180 + // Updates, if necessary the specified link and returns the appropriate event.
181 + private LinkEvent updateLink(ProviderId providerId, DefaultLink link,
182 + LinkKey key, LinkDescription linkDescription) {
183 + // FIXME confirm Link update condition is OK
184 + if (link.type() == INDIRECT && linkDescription.type() == DIRECT) {
185 + synchronized (this) {
186 +
187 + DefaultLink updated =
188 + new DefaultLink(providerId, link.src(), link.dst(),
189 + linkDescription.type());
190 + final byte[] keyBytes = serialize(key);
191 + rawLinks.put(keyBytes, serialize(updated));
192 + links.asMap().replace(key, Optional.of(link), Optional.of(updated));
193 +
194 + replaceLink(link, updated);
195 + return new LinkEvent(LINK_UPDATED, updated);
196 + }
197 + }
198 + return null;
199 + }
200 +
201 + // update Egress and ingress link sets
202 + private void replaceLink(DefaultLink link, DefaultLink updated) {
203 + synchronized (this) {
204 + srcLinks.remove(link.src().deviceId(), link);
205 + dstLinks.remove(link.dst().deviceId(), link);
206 +
207 + srcLinks.put(link.src().deviceId(), updated);
208 + dstLinks.put(link.dst().deviceId(), updated);
209 + }
210 + }
211 +
212 + @Override
213 + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
214 + synchronized (this) {
215 + LinkKey key = new LinkKey(src, dst);
216 + byte[] keyBytes = serialize(key);
217 + Link link = deserialize(rawLinks.remove(keyBytes));
218 + links.invalidate(key);
219 + if (link != null) {
220 + removeLink(link);
221 + return new LinkEvent(LINK_REMOVED, link);
222 + }
223 + return null;
224 + }
225 + }
226 +
227 + // update Egress and ingress link sets
228 + private void removeLink(Link link) {
229 + synchronized (this) {
230 + srcLinks.remove(link.src().deviceId(), link);
231 + dstLinks.remove(link.dst().deviceId(), link);
232 + }
233 + }
234 +
235 + private class RemoteLinkEventHandler extends RemoteEventHandler<LinkKey, DefaultLink> {
236 + public RemoteLinkEventHandler(LoadingCache<LinkKey, Optional<DefaultLink>> cache) {
237 + super(cache);
238 + }
239 +
240 + @Override
241 + protected void onAdd(LinkKey key, DefaultLink newVal) {
242 + addNewLink(newVal);
243 + notifyDelegate(new LinkEvent(LINK_ADDED, newVal));
244 + }
245 +
246 + @Override
247 + protected void onUpdate(LinkKey key, DefaultLink oldVal, DefaultLink newVal) {
248 + replaceLink(oldVal, newVal);
249 + notifyDelegate(new LinkEvent(LINK_UPDATED, newVal));
250 + }
251 +
252 + @Override
253 + protected void onRemove(LinkKey key, DefaultLink val) {
254 + removeLink(val);
255 + notifyDelegate(new LinkEvent(LINK_REMOVED, val));
256 + }
257 + }
258 +}
1 +/**
2 + * Implementation of link store using Hazelcast distributed structures.
3 + */
4 +package org.onlab.onos.store.link.impl;
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.ElementId;
5 +import org.onlab.onos.net.PortNumber;
6 +
7 +import com.esotericsoftware.kryo.Kryo;
8 +import com.esotericsoftware.kryo.Serializer;
9 +import com.esotericsoftware.kryo.io.Input;
10 +import com.esotericsoftware.kryo.io.Output;
11 +
12 +/**
13 + * Kryo Serializer for {@link ConnectPointSerializer}.
14 + */
15 +public class ConnectPointSerializer extends Serializer<ConnectPoint> {
16 +
17 + /**
18 + * Default constructor.
19 + */
20 + public ConnectPointSerializer() {
21 + // non-null, immutable
22 + super(false, true);
23 + }
24 +
25 + @Override
26 + public void write(Kryo kryo, Output output, ConnectPoint object) {
27 + kryo.writeClassAndObject(output, object.elementId());
28 + kryo.writeClassAndObject(output, object.port());
29 + }
30 +
31 + @Override
32 + public ConnectPoint read(Kryo kryo, Input input, Class<ConnectPoint> type) {
33 + ElementId elementId = (ElementId) kryo.readClassAndObject(input);
34 + PortNumber portNumber = (PortNumber) kryo.readClassAndObject(input);
35 + return new ConnectPoint(elementId, portNumber);
36 + }
37 +}
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.DefaultLink;
5 +import org.onlab.onos.net.Link.Type;
6 +import org.onlab.onos.net.provider.ProviderId;
7 +
8 +import com.esotericsoftware.kryo.Kryo;
9 +import com.esotericsoftware.kryo.Serializer;
10 +import com.esotericsoftware.kryo.io.Input;
11 +import com.esotericsoftware.kryo.io.Output;
12 +
13 +/**
14 + * Kryo Serializer for {@link DefaultLink}.
15 + */
16 +public class DefaultLinkSerializer extends Serializer<DefaultLink> {
17 +
18 + /**
19 + * Default constructor.
20 + */
21 + public DefaultLinkSerializer() {
22 + // non-null, immutable
23 + super(false, true);
24 + }
25 +
26 + @Override
27 + public void write(Kryo kryo, Output output, DefaultLink object) {
28 + kryo.writeClassAndObject(output, object.providerId());
29 + kryo.writeClassAndObject(output, object.src());
30 + kryo.writeClassAndObject(output, object.dst());
31 + kryo.writeClassAndObject(output, object.type());
32 + }
33 +
34 + @Override
35 + public DefaultLink read(Kryo kryo, Input input, Class<DefaultLink> type) {
36 + ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
37 + ConnectPoint src = (ConnectPoint) kryo.readClassAndObject(input);
38 + ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
39 + Type linkType = (Type) kryo.readClassAndObject(input);
40 + return new DefaultLink(providerId, src, dst, linkType);
41 + }
42 +}
1 +package org.onlab.onos.store.serializers;
2 +
3 +import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.LinkKey;
5 +import com.esotericsoftware.kryo.Kryo;
6 +import com.esotericsoftware.kryo.Serializer;
7 +import com.esotericsoftware.kryo.io.Input;
8 +import com.esotericsoftware.kryo.io.Output;
9 +
10 +/**
11 + * Kryo Serializer for {@link LinkKey}.
12 + */
13 +public class LinkKeySerializer extends Serializer<LinkKey> {
14 +
15 + /**
16 + * Default constructor.
17 + */
18 + public LinkKeySerializer() {
19 + // non-null, immutable
20 + super(false, true);
21 + }
22 +
23 + @Override
24 + public void write(Kryo kryo, Output output, LinkKey object) {
25 + kryo.writeClassAndObject(output, object.src());
26 + kryo.writeClassAndObject(output, object.dst());
27 + }
28 +
29 + @Override
30 + public LinkKey read(Kryo kryo, Input input, Class<LinkKey> type) {
31 + ConnectPoint src = (ConnectPoint) kryo.readClassAndObject(input);
32 + ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
33 + return new LinkKey(src, dst);
34 + }
35 +}
...@@ -3,6 +3,7 @@ package org.onlab.onos.net.trivial.impl; ...@@ -3,6 +3,7 @@ package org.onlab.onos.net.trivial.impl;
3 import com.google.common.collect.HashMultimap; 3 import com.google.common.collect.HashMultimap;
4 import com.google.common.collect.ImmutableSet; 4 import com.google.common.collect.ImmutableSet;
5 import com.google.common.collect.Multimap; 5 import com.google.common.collect.Multimap;
6 +
6 import org.apache.felix.scr.annotations.Activate; 7 import org.apache.felix.scr.annotations.Activate;
7 import org.apache.felix.scr.annotations.Component; 8 import org.apache.felix.scr.annotations.Component;
8 import org.apache.felix.scr.annotations.Deactivate; 9 import org.apache.felix.scr.annotations.Deactivate;
...@@ -11,6 +12,7 @@ import org.onlab.onos.net.ConnectPoint; ...@@ -11,6 +12,7 @@ import org.onlab.onos.net.ConnectPoint;
11 import org.onlab.onos.net.DefaultLink; 12 import org.onlab.onos.net.DefaultLink;
12 import org.onlab.onos.net.DeviceId; 13 import org.onlab.onos.net.DeviceId;
13 import org.onlab.onos.net.Link; 14 import org.onlab.onos.net.Link;
15 +import org.onlab.onos.net.LinkKey;
14 import org.onlab.onos.net.link.LinkDescription; 16 import org.onlab.onos.net.link.LinkDescription;
15 import org.onlab.onos.net.link.LinkEvent; 17 import org.onlab.onos.net.link.LinkEvent;
16 import org.onlab.onos.net.link.LinkStore; 18 import org.onlab.onos.net.link.LinkStore;
...@@ -22,7 +24,6 @@ import org.slf4j.Logger; ...@@ -22,7 +24,6 @@ import org.slf4j.Logger;
22 import java.util.Collections; 24 import java.util.Collections;
23 import java.util.HashSet; 25 import java.util.HashSet;
24 import java.util.Map; 26 import java.util.Map;
25 -import java.util.Objects;
26 import java.util.Set; 27 import java.util.Set;
27 import java.util.concurrent.ConcurrentHashMap; 28 import java.util.concurrent.ConcurrentHashMap;
28 29
...@@ -123,7 +124,7 @@ public class SimpleLinkStore ...@@ -123,7 +124,7 @@ public class SimpleLinkStore
123 // Creates and stores the link and returns the appropriate event. 124 // Creates and stores the link and returns the appropriate event.
124 private LinkEvent createLink(ProviderId providerId, LinkKey key, 125 private LinkEvent createLink(ProviderId providerId, LinkKey key,
125 LinkDescription linkDescription) { 126 LinkDescription linkDescription) {
126 - DefaultLink link = new DefaultLink(providerId, key.src, key.dst, 127 + DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
127 linkDescription.type()); 128 linkDescription.type());
128 synchronized (this) { 129 synchronized (this) {
129 links.put(key, link); 130 links.put(key, link);
...@@ -165,33 +166,4 @@ public class SimpleLinkStore ...@@ -165,33 +166,4 @@ public class SimpleLinkStore
165 return null; 166 return null;
166 } 167 }
167 } 168 }
168 -
169 - // Auxiliary key to track links.
170 - private class LinkKey {
171 - final ConnectPoint src;
172 - final ConnectPoint dst;
173 -
174 - LinkKey(ConnectPoint src, ConnectPoint dst) {
175 - this.src = src;
176 - this.dst = dst;
177 - }
178 -
179 - @Override
180 - public int hashCode() {
181 - return Objects.hash(src, dst);
182 - }
183 -
184 - @Override
185 - public boolean equals(Object obj) {
186 - if (this == obj) {
187 - return true;
188 - }
189 - if (obj instanceof LinkKey) {
190 - final LinkKey other = (LinkKey) obj;
191 - return Objects.equals(this.src, other.src) &&
192 - Objects.equals(this.dst, other.dst);
193 - }
194 - return false;
195 - }
196 - }
197 } 169 }
......
...@@ -3,11 +3,13 @@ package org.onlab.onos.net.trivial.impl; ...@@ -3,11 +3,13 @@ package org.onlab.onos.net.trivial.impl;
3 import static org.slf4j.LoggerFactory.getLogger; 3 import static org.slf4j.LoggerFactory.getLogger;
4 4
5 import java.util.Collections; 5 import java.util.Collections;
6 +import java.util.HashMap;
6 import java.util.HashSet; 7 import java.util.HashSet;
7 import java.util.Map; 8 import java.util.Map;
8 import java.util.Set; 9 import java.util.Set;
9 import java.util.concurrent.ConcurrentHashMap; 10 import java.util.concurrent.ConcurrentHashMap;
10 import java.util.concurrent.ConcurrentMap; 11 import java.util.concurrent.ConcurrentMap;
12 +import java.util.concurrent.atomic.AtomicInteger;
11 13
12 import org.apache.felix.scr.annotations.Activate; 14 import org.apache.felix.scr.annotations.Activate;
13 import org.apache.felix.scr.annotations.Component; 15 import org.apache.felix.scr.annotations.Component;
...@@ -18,6 +20,7 @@ import org.onlab.onos.cluster.DefaultControllerNode; ...@@ -18,6 +20,7 @@ import org.onlab.onos.cluster.DefaultControllerNode;
18 import org.onlab.onos.cluster.MastershipEvent; 20 import org.onlab.onos.cluster.MastershipEvent;
19 import org.onlab.onos.cluster.MastershipStore; 21 import org.onlab.onos.cluster.MastershipStore;
20 import org.onlab.onos.cluster.MastershipStoreDelegate; 22 import org.onlab.onos.cluster.MastershipStoreDelegate;
23 +import org.onlab.onos.cluster.MastershipTerm;
21 import org.onlab.onos.cluster.NodeId; 24 import org.onlab.onos.cluster.NodeId;
22 import org.onlab.onos.net.DeviceId; 25 import org.onlab.onos.net.DeviceId;
23 import org.onlab.onos.net.MastershipRole; 26 import org.onlab.onos.net.MastershipRole;
...@@ -47,6 +50,7 @@ public class SimpleMastershipStore ...@@ -47,6 +50,7 @@ public class SimpleMastershipStore
47 //devices mapped to their masters, to emulate multiple nodes 50 //devices mapped to their masters, to emulate multiple nodes
48 protected final ConcurrentMap<DeviceId, NodeId> masterMap = 51 protected final ConcurrentMap<DeviceId, NodeId> masterMap =
49 new ConcurrentHashMap<>(); 52 new ConcurrentHashMap<>();
53 + protected final Map<DeviceId, AtomicInteger> termMap = new HashMap<>();
50 54
51 @Activate 55 @Activate
52 public void activate() { 56 public void activate() {
...@@ -63,17 +67,23 @@ public class SimpleMastershipStore ...@@ -63,17 +67,23 @@ public class SimpleMastershipStore
63 67
64 NodeId node = masterMap.get(deviceId); 68 NodeId node = masterMap.get(deviceId);
65 if (node == null) { 69 if (node == null) {
70 + synchronized (this) {
66 masterMap.put(deviceId, nodeId); 71 masterMap.put(deviceId, nodeId);
72 + termMap.put(deviceId, new AtomicInteger());
73 + }
67 return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId); 74 return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
68 } 75 }
69 76
70 if (node.equals(nodeId)) { 77 if (node.equals(nodeId)) {
71 return null; 78 return null;
72 } else { 79 } else {
80 + synchronized (this) {
73 masterMap.put(deviceId, nodeId); 81 masterMap.put(deviceId, nodeId);
82 + termMap.get(deviceId).incrementAndGet();
74 return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId); 83 return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
75 } 84 }
76 } 85 }
86 + }
77 87
78 @Override 88 @Override
79 public NodeId getMaster(DeviceId deviceId) { 89 public NodeId getMaster(DeviceId deviceId) {
...@@ -114,4 +124,13 @@ public class SimpleMastershipStore ...@@ -114,4 +124,13 @@ public class SimpleMastershipStore
114 return role; 124 return role;
115 } 125 }
116 126
127 + @Override
128 + public MastershipTerm getTermFor(DeviceId deviceId) {
129 + if (masterMap.get(deviceId) == null) {
130 + return null;
131 + }
132 + return MastershipTerm.of(
133 + masterMap.get(deviceId), termMap.get(deviceId).get());
134 + }
135 +
117 } 136 }
......