Madan Jampani

Added support to device store for broadcasting device/port update events to peers

...@@ -4,6 +4,7 @@ import com.google.common.collect.FluentIterable; ...@@ -4,6 +4,7 @@ import com.google.common.collect.FluentIterable;
4 import com.google.common.collect.ImmutableList; 4 import com.google.common.collect.ImmutableList;
5 import com.google.common.collect.Maps; 5 import com.google.common.collect.Maps;
6 import com.google.common.collect.Sets; 6 import com.google.common.collect.Sets;
7 +
7 import org.apache.commons.lang3.concurrent.ConcurrentException; 8 import org.apache.commons.lang3.concurrent.ConcurrentException;
8 import org.apache.commons.lang3.concurrent.ConcurrentInitializer; 9 import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
9 import org.apache.felix.scr.annotations.Activate; 10 import org.apache.felix.scr.annotations.Activate;
...@@ -33,10 +34,15 @@ import org.onlab.onos.net.provider.ProviderId; ...@@ -33,10 +34,15 @@ import org.onlab.onos.net.provider.ProviderId;
33 import org.onlab.onos.store.AbstractStore; 34 import org.onlab.onos.store.AbstractStore;
34 import org.onlab.onos.store.ClockService; 35 import org.onlab.onos.store.ClockService;
35 import org.onlab.onos.store.Timestamp; 36 import org.onlab.onos.store.Timestamp;
37 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39 +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
40 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
36 import org.onlab.onos.store.common.impl.Timestamped; 41 import org.onlab.onos.store.common.impl.Timestamped;
37 import org.onlab.util.NewConcurrentHashMap; 42 import org.onlab.util.NewConcurrentHashMap;
38 import org.slf4j.Logger; 43 import org.slf4j.Logger;
39 44
45 +import java.io.IOException;
40 import java.util.ArrayList; 46 import java.util.ArrayList;
41 import java.util.Collections; 47 import java.util.Collections;
42 import java.util.HashSet; 48 import java.util.HashSet;
...@@ -96,6 +102,9 @@ public class GossipDeviceStore ...@@ -96,6 +102,9 @@ public class GossipDeviceStore
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected ClockService clockService; 103 protected ClockService clockService;
98 104
105 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
106 + protected ClusterCommunicationService clusterCommunicator;
107 +
99 @Activate 108 @Activate
100 public void activate() { 109 public void activate() {
101 log.info("Started"); 110 log.info("Started");
...@@ -133,8 +142,14 @@ public class GossipDeviceStore ...@@ -133,8 +142,14 @@ public class GossipDeviceStore
133 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); 142 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
134 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc); 143 DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
135 if (event != null) { 144 if (event != null) {
136 - // FIXME: broadcast deltaDesc, UP 145 + log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
137 - log.debug("broadcast deltaDesc"); 146 + providerId, deviceId);
147 + try {
148 + notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
149 + } catch (IOException e) {
150 + log.error("Failed to notify peers of a device update topology event or providerId: "
151 + + providerId + " and deviceId: " + deviceId, e);
152 + }
138 } 153 }
139 return event; 154 return event;
140 } 155 }
...@@ -298,19 +313,21 @@ public class GossipDeviceStore ...@@ -298,19 +313,21 @@ public class GossipDeviceStore
298 List<PortDescription> portDescriptions) { 313 List<PortDescription> portDescriptions) {
299 Timestamp newTimestamp = clockService.getTimestamp(deviceId); 314 Timestamp newTimestamp = clockService.getTimestamp(deviceId);
300 315
301 - List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size()); 316 + Timestamped<List<PortDescription>> timestampedPortDescriptions =
302 - for (PortDescription e : portDescriptions) { 317 + new Timestamped<>(portDescriptions, newTimestamp);
303 - deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
304 - }
305 318
306 - List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, 319 + List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
307 - new Timestamped<>(portDescriptions, newTimestamp));
308 if (!events.isEmpty()) { 320 if (!events.isEmpty()) {
309 - // FIXME: broadcast deltaDesc, UP 321 + log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
310 - log.debug("broadcast deltaDesc"); 322 + providerId, deviceId);
323 + try {
324 + notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
325 + } catch (IOException e) {
326 + log.error("Failed to notify peers of a port update topology event or providerId: "
327 + + providerId + " and deviceId: " + deviceId, e);
328 + }
311 } 329 }
312 return events; 330 return events;
313 -
314 } 331 }
315 332
316 private List<DeviceEvent> updatePortsInternal(ProviderId providerId, 333 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
...@@ -437,8 +454,14 @@ public class GossipDeviceStore ...@@ -437,8 +454,14 @@ public class GossipDeviceStore
437 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp); 454 final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
438 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc); 455 DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
439 if (event != null) { 456 if (event != null) {
440 - // FIXME: broadcast deltaDesc 457 + log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
441 - log.debug("broadcast deltaDesc"); 458 + providerId, deviceId);
459 + try {
460 + notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
461 + } catch (IOException e) {
462 + log.error("Failed to notify peers of a port status update topology event or providerId: "
463 + + providerId + " and deviceId: " + deviceId, e);
464 + }
442 } 465 }
443 return event; 466 return event;
444 } 467 }
...@@ -749,4 +772,61 @@ public class GossipDeviceStore ...@@ -749,4 +772,61 @@ public class GossipDeviceStore
749 return portDescs.put(newOne.value().portNumber(), newOne); 772 return portDescs.put(newOne.value().portNumber(), newOne);
750 } 773 }
751 } 774 }
775 +
776 + private void notifyPeers(InternalDeviceEvent event) throws IOException {
777 + ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-device-updates"), event);
778 + clusterCommunicator.broadcast(message);
779 + }
780 +
781 + private void notifyPeers(InternalPortEvent event) throws IOException {
782 + ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-updates"), event);
783 + clusterCommunicator.broadcast(message);
784 + }
785 +
786 + private void notifyPeers(InternalPortStatusEvent event) throws IOException {
787 + ClusterMessage message = new ClusterMessage(null, new MessageSubject("peer-port-status-updates"), event);
788 + clusterCommunicator.broadcast(message);
789 + }
790 +
791 + private class InternalDeviceEventListener implements ClusterMessageHandler {
792 + @Override
793 + public void handle(ClusterMessage message) {
794 + log.info("Received device update event from peer: {}", message.sender());
795 + InternalDeviceEvent event = (InternalDeviceEvent) message.payload();
796 + ProviderId providerId = event.providerId();
797 + DeviceId deviceId = event.deviceId();
798 + Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
799 + createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
800 + }
801 + }
802 +
803 + private class InternalPortEventListener implements ClusterMessageHandler {
804 + @Override
805 + public void handle(ClusterMessage message) {
806 +
807 + log.info("Received port update event from peer: {}", message.sender());
808 + InternalPortEvent event = (InternalPortEvent) message.payload();
809 +
810 + ProviderId providerId = event.providerId();
811 + DeviceId deviceId = event.deviceId();
812 + Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
813 +
814 + updatePortsInternal(providerId, deviceId, portDescriptions);
815 + }
816 + }
817 +
818 + private class InternalPortStatusEventListener implements ClusterMessageHandler {
819 + @Override
820 + public void handle(ClusterMessage message) {
821 +
822 + log.info("Received port status update event from peer: {}", message.sender());
823 + InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload();
824 +
825 + ProviderId providerId = event.providerId();
826 + DeviceId deviceId = event.deviceId();
827 + Timestamped<PortDescription> portDescription = event.portDescription();
828 +
829 + updatePortStatusInternal(providerId, deviceId, portDescription);
830 + }
831 + }
752 } 832 }
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import org.onlab.onos.net.DeviceId;
4 +import org.onlab.onos.net.device.DeviceDescription;
5 +import org.onlab.onos.net.provider.ProviderId;
6 +import org.onlab.onos.store.common.impl.Timestamped;
7 +
8 +public class InternalDeviceEvent {
9 +
10 + private final ProviderId providerId;
11 + private final DeviceId deviceId;
12 + private final Timestamped<DeviceDescription> deviceDescription;
13 +
14 + protected InternalDeviceEvent(
15 + ProviderId providerId,
16 + DeviceId deviceId,
17 + Timestamped<DeviceDescription> deviceDescription) {
18 + this.providerId = providerId;
19 + this.deviceId = deviceId;
20 + this.deviceDescription = deviceDescription;
21 + }
22 +
23 + public DeviceId deviceId() {
24 + return deviceId;
25 + }
26 +
27 + public ProviderId providerId() {
28 + return providerId;
29 + }
30 +
31 + public Timestamped<DeviceDescription> deviceDescription() {
32 + return deviceDescription;
33 + }
34 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import java.util.List;
4 +
5 +import org.onlab.onos.net.DeviceId;
6 +import org.onlab.onos.net.device.PortDescription;
7 +import org.onlab.onos.net.provider.ProviderId;
8 +import org.onlab.onos.store.common.impl.Timestamped;
9 +
10 +public class InternalPortEvent {
11 +
12 + private final ProviderId providerId;
13 + private final DeviceId deviceId;
14 + private final Timestamped<List<PortDescription>> portDescriptions;
15 +
16 + protected InternalPortEvent(
17 + ProviderId providerId,
18 + DeviceId deviceId,
19 + Timestamped<List<PortDescription>> portDescriptions) {
20 + this.providerId = providerId;
21 + this.deviceId = deviceId;
22 + this.portDescriptions = portDescriptions;
23 + }
24 +
25 + public DeviceId deviceId() {
26 + return deviceId;
27 + }
28 +
29 + public ProviderId providerId() {
30 + return providerId;
31 + }
32 +
33 + public Timestamped<List<PortDescription>> portDescriptions() {
34 + return portDescriptions;
35 + }
36 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import org.onlab.onos.net.DeviceId;
4 +import org.onlab.onos.net.device.PortDescription;
5 +import org.onlab.onos.net.provider.ProviderId;
6 +import org.onlab.onos.store.common.impl.Timestamped;
7 +
8 +public class InternalPortStatusEvent {
9 +
10 + private final ProviderId providerId;
11 + private final DeviceId deviceId;
12 + private final Timestamped<PortDescription> portDescription;
13 +
14 + protected InternalPortStatusEvent(
15 + ProviderId providerId,
16 + DeviceId deviceId,
17 + Timestamped<PortDescription> portDescription) {
18 + this.providerId = providerId;
19 + this.deviceId = deviceId;
20 + this.portDescription = portDescription;
21 + }
22 +
23 + public DeviceId deviceId() {
24 + return deviceId;
25 + }
26 +
27 + public ProviderId providerId() {
28 + return providerId;
29 + }
30 +
31 + public Timestamped<PortDescription> portDescription() {
32 + return portDescription;
33 + }
34 +}
...@@ -5,6 +5,7 @@ import static org.onlab.onos.net.Device.Type.SWITCH; ...@@ -5,6 +5,7 @@ import static org.onlab.onos.net.Device.Type.SWITCH;
5 import static org.onlab.onos.net.DeviceId.deviceId; 5 import static org.onlab.onos.net.DeviceId.deviceId;
6 import static org.onlab.onos.net.device.DeviceEvent.Type.*; 6 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
7 7
8 +import java.io.IOException;
8 import java.util.Arrays; 9 import java.util.Arrays;
9 import java.util.HashMap; 10 import java.util.HashMap;
10 import java.util.List; 11 import java.util.List;
...@@ -37,6 +38,10 @@ import org.onlab.onos.net.device.DeviceStoreDelegate; ...@@ -37,6 +38,10 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
37 import org.onlab.onos.net.device.PortDescription; 38 import org.onlab.onos.net.device.PortDescription;
38 import org.onlab.onos.net.provider.ProviderId; 39 import org.onlab.onos.net.provider.ProviderId;
39 import org.onlab.onos.store.ClockService; 40 import org.onlab.onos.store.ClockService;
41 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
42 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
43 +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
44 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
40 45
41 import com.google.common.collect.Iterables; 46 import com.google.common.collect.Iterables;
42 import com.google.common.collect.Sets; 47 import com.google.common.collect.Sets;
...@@ -105,7 +110,9 @@ public class GossipDeviceStoreTest { ...@@ -105,7 +110,9 @@ public class GossipDeviceStoreTest {
105 deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1)); 110 deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
106 deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2)); 111 deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
107 112
108 - gossipDeviceStore = new TestGossipDeviceStore(clockService); 113 + ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
114 +
115 + gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator);
109 gossipDeviceStore.activate(); 116 gossipDeviceStore.activate();
110 deviceStore = gossipDeviceStore; 117 deviceStore = gossipDeviceStore;
111 } 118 }
...@@ -541,8 +548,20 @@ public class GossipDeviceStoreTest { ...@@ -541,8 +548,20 @@ public class GossipDeviceStoreTest {
541 548
542 private static final class TestGossipDeviceStore extends GossipDeviceStore { 549 private static final class TestGossipDeviceStore extends GossipDeviceStore {
543 550
544 - public TestGossipDeviceStore(ClockService clockService) { 551 + public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) {
545 this.clockService = clockService; 552 this.clockService = clockService;
553 + this.clusterCommunicator = clusterCommunicator;
554 + }
546 } 555 }
556 +
557 + private static final class TestClusterCommunicationService implements ClusterCommunicationService {
558 + @Override
559 + public boolean broadcast(ClusterMessage message) throws IOException { return true; }
560 + @Override
561 + public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
562 + @Override
563 + public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
564 + @Override
565 + public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
547 } 566 }
548 } 567 }
......