Jonathan Hart
Committed by Gerrit Code Review

Add basic multicast RIB synchronization over REST

Change-Id: I75f22956b6b73427ca657f5ab58330b1417fdf43
...@@ -63,5 +63,10 @@ ...@@ -63,5 +63,10 @@
63 <artifactId>org.osgi.compendium</artifactId> 63 <artifactId>org.osgi.compendium</artifactId>
64 <version>5.0.0</version> 64 <version>5.0.0</version>
65 </dependency> 65 </dependency>
66 + <dependency>
67 + <groupId>com.sun.jersey</groupId>
68 + <artifactId>jersey-client</artifactId>
69 + <version>1.19</version>
70 + </dependency>
66 </dependencies> 71 </dependencies>
67 </project> 72 </project>
......
...@@ -15,16 +15,25 @@ ...@@ -15,16 +15,25 @@
15 */ 15 */
16 package org.onosproject.cordmcast; 16 package org.onosproject.cordmcast;
17 17
18 +import com.fasterxml.jackson.databind.node.ObjectNode;
18 import com.google.common.collect.Maps; 19 import com.google.common.collect.Maps;
20 +import com.sun.jersey.api.client.Client;
21 +import com.sun.jersey.api.client.WebResource;
22 +import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
19 import org.apache.felix.scr.annotations.Activate; 23 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 24 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 25 import org.apache.felix.scr.annotations.Deactivate;
26 +import org.apache.felix.scr.annotations.Modified;
27 +import org.apache.felix.scr.annotations.Property;
22 import org.apache.felix.scr.annotations.Reference; 28 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality; 29 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.onlab.packet.Ethernet; 30 import org.onlab.packet.Ethernet;
25 import org.onlab.packet.IPv4; 31 import org.onlab.packet.IPv4;
26 import org.onlab.packet.IpAddress; 32 import org.onlab.packet.IpAddress;
27 import org.onlab.packet.VlanId; 33 import org.onlab.packet.VlanId;
34 +import org.onlab.util.Tools;
35 +import org.onosproject.cfg.ComponentConfigService;
36 +import org.onosproject.codec.CodecService;
28 import org.onosproject.core.ApplicationId; 37 import org.onosproject.core.ApplicationId;
29 import org.onosproject.core.CoreService; 38 import org.onosproject.core.CoreService;
30 import org.onosproject.net.ConnectPoint; 39 import org.onosproject.net.ConnectPoint;
...@@ -42,18 +51,24 @@ import org.onosproject.net.flowobjective.ObjectiveError; ...@@ -42,18 +51,24 @@ import org.onosproject.net.flowobjective.ObjectiveError;
42 import org.onosproject.net.group.GroupService; 51 import org.onosproject.net.group.GroupService;
43 import org.onosproject.net.mcast.McastEvent; 52 import org.onosproject.net.mcast.McastEvent;
44 import org.onosproject.net.mcast.McastListener; 53 import org.onosproject.net.mcast.McastListener;
54 +import org.onosproject.net.mcast.McastRoute;
45 import org.onosproject.net.mcast.McastRouteInfo; 55 import org.onosproject.net.mcast.McastRouteInfo;
46 import org.onosproject.net.mcast.MulticastRouteService; 56 import org.onosproject.net.mcast.MulticastRouteService;
57 +import org.onosproject.rest.AbstractWebResource;
58 +import org.osgi.service.component.ComponentContext;
47 import org.slf4j.Logger; 59 import org.slf4j.Logger;
48 60
61 +import java.util.Dictionary;
49 import java.util.Map; 62 import java.util.Map;
63 +import java.util.concurrent.atomic.AtomicBoolean;
50 import java.util.concurrent.atomic.AtomicInteger; 64 import java.util.concurrent.atomic.AtomicInteger;
51 65
66 +import static com.google.common.net.MediaType.JSON_UTF_8;
52 import static org.slf4j.LoggerFactory.getLogger; 67 import static org.slf4j.LoggerFactory.getLogger;
53 68
54 /** 69 /**
55 * CORD multicast provisoning application. Operates by listening to 70 * CORD multicast provisoning application. Operates by listening to
56 - * events on the multicast rib and provsioning groups to program multicast 71 + * events on the multicast rib and provisioning groups to program multicast
57 * flows on the dataplane. 72 * flows on the dataplane.
58 */ 73 */
59 @Component(immediate = true) 74 @Component(immediate = true)
...@@ -75,6 +90,13 @@ public class CordMcast { ...@@ -75,6 +90,13 @@ public class CordMcast {
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 90 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected CoreService coreService; 91 protected CoreService coreService;
77 92
93 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 + protected CodecService codecService;
95 +
96 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 + protected ComponentConfigService componentConfigService;
98 +
99 +
78 protected McastListener listener = new InternalMulticastListener(); 100 protected McastListener listener = new InternalMulticastListener();
79 101
80 102
...@@ -93,20 +115,50 @@ public class CordMcast { ...@@ -93,20 +115,50 @@ public class CordMcast {
93 // TODO component config this 115 // TODO component config this
94 private int priority = DEFAULT_PRIORITY; 116 private int priority = DEFAULT_PRIORITY;
95 117
118 + private static final String DEFAULT_USER = "karaf";
119 + private static final String DEFAULT_PASSWORD = "karaf";
120 +
121 + @Property(name = "syncHost", value = "",
122 + label = "host:port to synchronize routes to")
123 + private String syncHost = "10.90.0.8:8181";
124 +
125 + @Property(name = "username", value = DEFAULT_USER,
126 + label = "Username for REST password authentication")
127 + private String user = DEFAULT_USER;
128 +
129 + @Property(name = "password", value = DEFAULT_PASSWORD,
130 + label = "Password for REST authentication")
131 + private String password = DEFAULT_PASSWORD;
132 +
133 + private String fabricOnosUrl;
134 +
96 @Activate 135 @Activate
97 public void activate() { 136 public void activate() {
98 appId = coreService.registerApplication("org.onosproject.cordmcast"); 137 appId = coreService.registerApplication("org.onosproject.cordmcast");
138 + componentConfigService.registerProperties(getClass());
99 mcastService.addListener(listener); 139 mcastService.addListener(listener);
140 +
141 + fabricOnosUrl = "http://" + syncHost + "/onos/v1/mcast";
142 +
100 //TODO: obtain all existing mcast routes 143 //TODO: obtain all existing mcast routes
101 log.info("Started"); 144 log.info("Started");
102 } 145 }
103 146
104 @Deactivate 147 @Deactivate
105 public void deactivate() { 148 public void deactivate() {
149 + componentConfigService.unregisterProperties(getClass(), true);
106 mcastService.removeListener(listener); 150 mcastService.removeListener(listener);
107 log.info("Stopped"); 151 log.info("Stopped");
108 } 152 }
109 153
154 + @Modified
155 + public void modified(ComponentContext context) {
156 + Dictionary<?, ?> properties = context.getProperties();
157 + user = Tools.get(properties, "username");
158 + password = Tools.get(properties, "password");
159 + syncHost = Tools.get(properties, "syncHost");
160 + }
161 +
110 private class InternalMulticastListener implements McastListener { 162 private class InternalMulticastListener implements McastListener {
111 @Override 163 @Override
112 public void event(McastEvent event) { 164 public void event(McastEvent event) {
...@@ -135,9 +187,10 @@ public class CordMcast { ...@@ -135,9 +187,10 @@ public class CordMcast {
135 } 187 }
136 ConnectPoint loc = info.sink().get(); 188 ConnectPoint loc = info.sink().get();
137 189
190 + final AtomicBoolean sync = new AtomicBoolean(false);
138 191
139 Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> { 192 Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
140 - Integer id = allocateId(g); 193 + Integer id = allocateId();
141 194
142 TrafficSelector mcast = DefaultTrafficSelector.builder() 195 TrafficSelector mcast = DefaultTrafficSelector.builder()
143 .matchVlanId(VlanId.vlanId(mcastVlan)) 196 .matchVlanId(VlanId.vlanId(mcastVlan))
...@@ -170,6 +223,8 @@ public class CordMcast { ...@@ -170,6 +223,8 @@ public class CordMcast {
170 223
171 flowObjectiveService.forward(loc.deviceId(), fwd); 224 flowObjectiveService.forward(loc.deviceId(), fwd);
172 225
226 + sync.set(true);
227 +
173 return id; 228 return id;
174 }); 229 });
175 230
...@@ -195,10 +250,37 @@ public class CordMcast { ...@@ -195,10 +250,37 @@ public class CordMcast {
195 }); 250 });
196 251
197 flowObjectiveService.next(loc.deviceId(), next); 252 flowObjectiveService.next(loc.deviceId(), next);
253 +
254 + if (sync.get()) {
255 + syncRoute(info);
256 + }
198 } 257 }
199 258
200 - private Integer allocateId(IpAddress group) { 259 + private void syncRoute(McastRouteInfo info) {
201 - Integer channel = groups.putIfAbsent(group, channels.getAndIncrement()); 260 + if (syncHost == null) {
202 - return channel == null ? groups.get(group) : channel; 261 + log.warn("No host configured for synchronization; route will be dropped");
262 + return;
203 } 263 }
264 +
265 + log.debug("Sending route to other ONOS: {}", info.route());
266 +
267 + WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
268 +
269 + ObjectNode json = codecService.getCodec(McastRoute.class)
270 + .encode(info.route(), new AbstractWebResource());
271 + builder.post(json.toString());
272 + }
273 +
274 + private Integer allocateId() {
275 + return channels.getAndIncrement();
276 + }
277 +
278 + private WebResource.Builder getClientBuilder(String uri) {
279 + Client client = Client.create();
280 + client.addFilter(new HTTPBasicAuthFilter(user, password));
281 + WebResource resource = client.resource(uri);
282 + return resource.accept(JSON_UTF_8.toString())
283 + .type(JSON_UTF_8.toString());
284 + }
285 +
204 } 286 }
......
...@@ -58,6 +58,8 @@ ...@@ -58,6 +58,8 @@
58 <bundle>mvn:com.typesafe/config/1.2.1</bundle> 58 <bundle>mvn:com.typesafe/config/1.2.1</bundle>
59 <bundle>mvn:org.onosproject/onlab-thirdparty/@ONOS-VERSION</bundle> 59 <bundle>mvn:org.onosproject/onlab-thirdparty/@ONOS-VERSION</bundle>
60 60
61 + <bundle>mvn:com.sun.jersey/jersey-client/1.19</bundle>
62 +
61 <bundle>mvn:org.mapdb/mapdb/1.0.7</bundle> 63 <bundle>mvn:org.mapdb/mapdb/1.0.7</bundle>
62 </feature> 64 </feature>
63 65
......