Jonathan Hart
Committed by Gerrit Code Review

CordMcast clears remote routes on startup to prevent being out of sync.

It also reads existing routes on startup.

Change-Id: I13b8ffae2b57d1e82181a8a745bda185d56f368d
...@@ -15,7 +15,10 @@ ...@@ -15,7 +15,10 @@
15 */ 15 */
16 package org.onosproject.cordmcast; 16 package org.onosproject.cordmcast;
17 17
18 +import com.fasterxml.jackson.databind.ObjectMapper;
19 +import com.fasterxml.jackson.databind.node.ArrayNode;
18 import com.fasterxml.jackson.databind.node.ObjectNode; 20 import com.fasterxml.jackson.databind.node.ObjectNode;
21 +import com.google.common.collect.Lists;
19 import com.google.common.collect.Maps; 22 import com.google.common.collect.Maps;
20 import com.sun.jersey.api.client.Client; 23 import com.sun.jersey.api.client.Client;
21 import com.sun.jersey.api.client.WebResource; 24 import com.sun.jersey.api.client.WebResource;
...@@ -32,6 +35,7 @@ import org.onlab.packet.IpAddress; ...@@ -32,6 +35,7 @@ import org.onlab.packet.IpAddress;
32 import org.onlab.packet.VlanId; 35 import org.onlab.packet.VlanId;
33 import org.onosproject.cfg.ComponentConfigService; 36 import org.onosproject.cfg.ComponentConfigService;
34 import org.onosproject.codec.CodecService; 37 import org.onosproject.codec.CodecService;
38 +import org.onosproject.codec.JsonCodec;
35 import org.onosproject.core.ApplicationId; 39 import org.onosproject.core.ApplicationId;
36 import org.onosproject.core.CoreService; 40 import org.onosproject.core.CoreService;
37 import org.onosproject.net.ConnectPoint; 41 import org.onosproject.net.ConnectPoint;
...@@ -46,7 +50,6 @@ import org.onosproject.net.flowobjective.NextObjective; ...@@ -46,7 +50,6 @@ import org.onosproject.net.flowobjective.NextObjective;
46 import org.onosproject.net.flowobjective.Objective; 50 import org.onosproject.net.flowobjective.Objective;
47 import org.onosproject.net.flowobjective.ObjectiveContext; 51 import org.onosproject.net.flowobjective.ObjectiveContext;
48 import org.onosproject.net.flowobjective.ObjectiveError; 52 import org.onosproject.net.flowobjective.ObjectiveError;
49 -import org.onosproject.net.group.GroupService;
50 import org.onosproject.net.mcast.McastEvent; 53 import org.onosproject.net.mcast.McastEvent;
51 import org.onosproject.net.mcast.McastListener; 54 import org.onosproject.net.mcast.McastListener;
52 import org.onosproject.net.mcast.McastRoute; 55 import org.onosproject.net.mcast.McastRoute;
...@@ -56,19 +59,24 @@ import org.onosproject.rest.AbstractWebResource; ...@@ -56,19 +59,24 @@ import org.onosproject.rest.AbstractWebResource;
56 import org.osgi.service.component.ComponentContext; 59 import org.osgi.service.component.ComponentContext;
57 import org.slf4j.Logger; 60 import org.slf4j.Logger;
58 61
62 +import javax.ws.rs.core.MediaType;
63 +import java.io.IOException;
59 import java.util.Dictionary; 64 import java.util.Dictionary;
65 +import java.util.List;
60 import java.util.Map; 66 import java.util.Map;
61 import java.util.Properties; 67 import java.util.Properties;
68 +import java.util.Set;
62 import java.util.concurrent.atomic.AtomicBoolean; 69 import java.util.concurrent.atomic.AtomicBoolean;
63 import java.util.concurrent.atomic.AtomicInteger; 70 import java.util.concurrent.atomic.AtomicInteger;
64 71
72 +import static com.google.common.base.Preconditions.checkNotNull;
65 import static com.google.common.base.Strings.isNullOrEmpty; 73 import static com.google.common.base.Strings.isNullOrEmpty;
66 import static com.google.common.net.MediaType.JSON_UTF_8; 74 import static com.google.common.net.MediaType.JSON_UTF_8;
67 import static org.onlab.util.Tools.get; 75 import static org.onlab.util.Tools.get;
68 import static org.slf4j.LoggerFactory.getLogger; 76 import static org.slf4j.LoggerFactory.getLogger;
69 77
70 /** 78 /**
71 - * CORD multicast provisoning application. Operates by listening to 79 + * CORD multicast provisioning application. Operates by listening to
72 * events on the multicast rib and provisioning groups to program multicast 80 * events on the multicast rib and provisioning groups to program multicast
73 * flows on the dataplane. 81 * flows on the dataplane.
74 */ 82 */
...@@ -87,9 +95,6 @@ public class CordMcast { ...@@ -87,9 +95,6 @@ public class CordMcast {
87 protected MulticastRouteService mcastService; 95 protected MulticastRouteService mcastService;
88 96
89 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 97 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 - protected GroupService groupService;
91 -
92 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected FlowObjectiveService flowObjectiveService; 98 protected FlowObjectiveService flowObjectiveService;
94 99
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -139,21 +144,30 @@ public class CordMcast { ...@@ -139,21 +144,30 @@ public class CordMcast {
139 144
140 @Activate 145 @Activate
141 public void activate(ComponentContext context) { 146 public void activate(ComponentContext context) {
147 + componentConfigService.registerProperties(getClass());
142 modified(context); 148 modified(context);
143 149
144 appId = coreService.registerApplication("org.onosproject.cordmcast"); 150 appId = coreService.registerApplication("org.onosproject.cordmcast");
145 - componentConfigService.registerProperties(getClass());
146 - mcastService.addListener(listener);
147 151
148 fabricOnosUrl = "http://" + syncHost + "/onos/v1/mcast"; 152 fabricOnosUrl = "http://" + syncHost + "/onos/v1/mcast";
149 153
150 - //TODO: obtain all existing mcast routes 154 + clearRemoteRoutes();
155 +
156 + mcastService.addListener(listener);
157 +
158 + for (McastRoute route : mcastService.getRoutes()) {
159 + Set<ConnectPoint> sinks = mcastService.fetchSinks(route);
160 + if (!sinks.isEmpty()) {
161 + sinks.forEach(s -> provisionGroup(route, s));
162 + }
163 + }
164 +
151 log.info("Started"); 165 log.info("Started");
152 } 166 }
153 167
154 @Deactivate 168 @Deactivate
155 public void deactivate() { 169 public void deactivate() {
156 - componentConfigService.unregisterProperties(getClass(), true); 170 + componentConfigService.unregisterProperties(getClass(), false);
157 mcastService.removeListener(listener); 171 mcastService.removeListener(listener);
158 log.info("Stopped"); 172 log.info("Stopped");
159 } 173 }
...@@ -162,7 +176,6 @@ public class CordMcast { ...@@ -162,7 +176,6 @@ public class CordMcast {
162 public void modified(ComponentContext context) { 176 public void modified(ComponentContext context) {
163 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties(); 177 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
164 178
165 -
166 try { 179 try {
167 String s = get(properties, "username"); 180 String s = get(properties, "username");
168 user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim(); 181 user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
...@@ -189,13 +202,12 @@ public class CordMcast { ...@@ -189,13 +202,12 @@ public class CordMcast {
189 vlanEnabled = false; 202 vlanEnabled = false;
190 priority = DEFAULT_PRIORITY; 203 priority = DEFAULT_PRIORITY;
191 } 204 }
192 -
193 -
194 } 205 }
195 206
196 private class InternalMulticastListener implements McastListener { 207 private class InternalMulticastListener implements McastListener {
197 @Override 208 @Override
198 public void event(McastEvent event) { 209 public void event(McastEvent event) {
210 + McastRouteInfo info = event.subject();
199 switch (event.type()) { 211 switch (event.type()) {
200 case ROUTE_ADDED: 212 case ROUTE_ADDED:
201 break; 213 break;
...@@ -204,7 +216,11 @@ public class CordMcast { ...@@ -204,7 +216,11 @@ public class CordMcast {
204 case SOURCE_ADDED: 216 case SOURCE_ADDED:
205 break; 217 break;
206 case SINK_ADDED: 218 case SINK_ADDED:
207 - provisionGroup(event.subject()); 219 + if (!info.sink().isPresent()) {
220 + log.warn("No sink given after sink added event: {}", info);
221 + return;
222 + }
223 + provisionGroup(info.route(), info.sink().get());
208 break; 224 break;
209 case SINK_REMOVED: 225 case SINK_REMOVED:
210 unprovisionGroup(event.subject()); 226 unprovisionGroup(event.subject());
...@@ -217,7 +233,7 @@ public class CordMcast { ...@@ -217,7 +233,7 @@ public class CordMcast {
217 233
218 private void unprovisionGroup(McastRouteInfo info) { 234 private void unprovisionGroup(McastRouteInfo info) {
219 if (info.sinks().isEmpty()) { 235 if (info.sinks().isEmpty()) {
220 - removeSyncedRoute(info); 236 + removeRemoteRoute(info.route());
221 } 237 }
222 238
223 if (!info.sink().isPresent()) { 239 if (!info.sink().isPresent()) {
...@@ -250,21 +266,18 @@ public class CordMcast { ...@@ -250,21 +266,18 @@ public class CordMcast {
250 flowObjectiveService.next(loc.deviceId(), next); 266 flowObjectiveService.next(loc.deviceId(), next);
251 } 267 }
252 268
253 - private void provisionGroup(McastRouteInfo info) { 269 + private void provisionGroup(McastRoute route, ConnectPoint sink) {
254 - if (!info.sink().isPresent()) { 270 + checkNotNull(route, "Route cannot be null");
255 - log.warn("No sink given after sink added event: {}", info); 271 + checkNotNull(sink, "Sink cannot be null");
256 - return;
257 - }
258 - ConnectPoint loc = info.sink().get();
259 272
260 final AtomicBoolean sync = new AtomicBoolean(false); 273 final AtomicBoolean sync = new AtomicBoolean(false);
261 274
262 - Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> { 275 + Integer nextId = groups.computeIfAbsent(route.group(), (g) -> {
263 Integer id = allocateId(); 276 Integer id = allocateId();
264 277
265 NextObjective next = DefaultNextObjective.builder() 278 NextObjective next = DefaultNextObjective.builder()
266 .fromApp(appId) 279 .fromApp(appId)
267 - .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build()) 280 + .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
268 .withType(NextObjective.Type.BROADCAST) 281 .withType(NextObjective.Type.BROADCAST)
269 .withId(id) 282 .withId(id)
270 .add(new ObjectiveContext() { 283 .add(new ObjectiveContext() {
...@@ -283,18 +296,16 @@ public class CordMcast { ...@@ -283,18 +296,16 @@ public class CordMcast {
283 } 296 }
284 }); 297 });
285 298
286 - flowObjectiveService.next(loc.deviceId(), next); 299 + flowObjectiveService.next(sink.deviceId(), next);
287 300
288 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder() 301 TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
289 .matchEthType(Ethernet.TYPE_IPV4) 302 .matchEthType(Ethernet.TYPE_IPV4)
290 .matchIPDst(g.toIpPrefix()); 303 .matchIPDst(g.toIpPrefix());
291 304
292 -
293 if (vlanEnabled) { 305 if (vlanEnabled) {
294 mcast.matchVlanId(VlanId.vlanId((short) mcastVlan)); 306 mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
295 } 307 }
296 308
297 -
298 ForwardingObjective fwd = DefaultForwardingObjective.builder() 309 ForwardingObjective fwd = DefaultForwardingObjective.builder()
299 .fromApp(appId) 310 .fromApp(appId)
300 .nextStep(id) 311 .nextStep(id)
...@@ -316,7 +327,7 @@ public class CordMcast { ...@@ -316,7 +327,7 @@ public class CordMcast {
316 } 327 }
317 }); 328 });
318 329
319 - flowObjectiveService.forward(loc.deviceId(), fwd); 330 + flowObjectiveService.forward(sink.deviceId(), fwd);
320 331
321 sync.set(true); 332 sync.set(true);
322 333
...@@ -326,7 +337,7 @@ public class CordMcast { ...@@ -326,7 +337,7 @@ public class CordMcast {
326 if (!sync.get()) { 337 if (!sync.get()) {
327 NextObjective next = DefaultNextObjective.builder() 338 NextObjective next = DefaultNextObjective.builder()
328 .fromApp(appId) 339 .fromApp(appId)
329 - .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build()) 340 + .addTreatment(DefaultTrafficTreatment.builder().setOutput(sink.port()).build())
330 .withType(NextObjective.Type.BROADCAST) 341 .withType(NextObjective.Type.BROADCAST)
331 .withId(nextId) 342 .withId(nextId)
332 .addToExisting(new ObjectiveContext() { 343 .addToExisting(new ObjectiveContext() {
...@@ -345,44 +356,73 @@ public class CordMcast { ...@@ -345,44 +356,73 @@ public class CordMcast {
345 } 356 }
346 }); 357 });
347 358
348 - flowObjectiveService.next(loc.deviceId(), next); 359 + flowObjectiveService.next(sink.deviceId(), next);
349 } 360 }
350 361
351 - if (sync.get()) { 362 + addRemoteRoute(route);
352 - syncRoute(info);
353 - }
354 } 363 }
355 364
356 - private void syncRoute(McastRouteInfo info) { 365 + private void addRemoteRoute(McastRoute route) {
366 + checkNotNull(route);
357 if (syncHost == null) { 367 if (syncHost == null) {
358 log.warn("No host configured for synchronization; route will be dropped"); 368 log.warn("No host configured for synchronization; route will be dropped");
359 return; 369 return;
360 } 370 }
361 371
362 - log.debug("Sending route to other ONOS: {}", info.route()); 372 + log.debug("Sending route to other ONOS: {}", route);
363 373
364 WebResource.Builder builder = getClientBuilder(fabricOnosUrl); 374 WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
365 375
366 ObjectNode json = codecService.getCodec(McastRoute.class) 376 ObjectNode json = codecService.getCodec(McastRoute.class)
367 - .encode(info.route(), new AbstractWebResource()); 377 + .encode(route, new AbstractWebResource());
368 builder.post(json.toString()); 378 builder.post(json.toString());
369 } 379 }
370 380
371 - private void removeSyncedRoute(McastRouteInfo info) { 381 + private void removeRemoteRoute(McastRoute route) {
372 if (syncHost == null) { 382 if (syncHost == null) {
373 log.warn("No host configured for synchronization; route will be dropped"); 383 log.warn("No host configured for synchronization; route will be dropped");
374 return; 384 return;
375 } 385 }
376 386
377 - log.debug("Removing route from other ONOS: {}", info.route()); 387 + log.debug("Removing route from other ONOS: {}", route);
378 388
379 WebResource.Builder builder = getClientBuilder(fabricOnosUrl); 389 WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
380 390
381 ObjectNode json = codecService.getCodec(McastRoute.class) 391 ObjectNode json = codecService.getCodec(McastRoute.class)
382 - .encode(info.route(), new AbstractWebResource()); 392 + .encode(route, new AbstractWebResource());
383 builder.delete(json.toString()); 393 builder.delete(json.toString());
384 } 394 }
385 395
396 + private void clearRemoteRoutes() {
397 + if (syncHost == null) {
398 + log.warn("No host configured for synchronization");
399 + return;
400 + }
401 +
402 + log.debug("Clearing remote multicast routes");
403 +
404 + WebResource.Builder builder = getClientBuilder(fabricOnosUrl);
405 +
406 + String response = builder
407 + .accept(MediaType.APPLICATION_JSON_TYPE)
408 + .get(String.class);
409 +
410 + JsonCodec<McastRoute> routeCodec = codecService.getCodec(McastRoute.class);
411 + ObjectMapper mapper = new ObjectMapper();
412 + List<McastRoute> mcastRoutes = Lists.newArrayList();
413 + try {
414 + ObjectNode node = (ObjectNode) mapper.readTree(response);
415 + ArrayNode list = (ArrayNode) node.path("routes");
416 +
417 + list.forEach(n -> mcastRoutes.add(
418 + routeCodec.decode((ObjectNode) n, new AbstractWebResource())));
419 + } catch (IOException e) {
420 + log.warn("Error clearing remote routes", e);
421 + }
422 +
423 + mcastRoutes.forEach(this::removeRemoteRoute);
424 + }
425 +
386 private Integer allocateId() { 426 private Integer allocateId() {
387 return channels.getAndIncrement(); 427 return channels.getAndIncrement();
388 } 428 }
......
...@@ -179,12 +179,11 @@ public class IgmpSnoop { ...@@ -179,12 +179,11 @@ public class IgmpSnoop {
179 179
180 @Activate 180 @Activate
181 public void activate(ComponentContext context) { 181 public void activate(ComponentContext context) {
182 + componentConfigService.registerProperties(getClass());
182 modified(context); 183 modified(context);
183 184
184 appId = coreService.registerApplication("org.onosproject.igmp"); 185 appId = coreService.registerApplication("org.onosproject.igmp");
185 186
186 - componentConfigService.registerProperties(getClass());
187 -
188 packetService.addProcessor(processor, PacketProcessor.director(1)); 187 packetService.addProcessor(processor, PacketProcessor.director(1));
189 188
190 networkConfig.registerConfigFactory(configFactory); 189 networkConfig.registerConfigFactory(configFactory);
......
...@@ -142,10 +142,10 @@ public class SingleSwitchFibInstaller { ...@@ -142,10 +142,10 @@ public class SingleSwitchFibInstaller {
142 142
143 @Activate 143 @Activate
144 protected void activate(ComponentContext context) { 144 protected void activate(ComponentContext context) {
145 + componentConfigService.registerProperties(getClass());
145 modified(context); 146 modified(context);
146 - routerAppId = coreService.registerApplication(RoutingService.ROUTER_APP_ID);
147 147
148 - componentConfigService.registerProperties(getClass()); 148 + routerAppId = coreService.registerApplication(RoutingService.ROUTER_APP_ID);
149 149
150 deviceListener = new InternalDeviceListener(); 150 deviceListener = new InternalDeviceListener();
151 deviceService.addListener(deviceListener); 151 deviceService.addListener(deviceListener);
......
...@@ -36,7 +36,7 @@ public class McastDeleteCommand extends AbstractShellCommand { ...@@ -36,7 +36,7 @@ public class McastDeleteCommand extends AbstractShellCommand {
36 String sAddr = null; 36 String sAddr = null;
37 37
38 @Argument(index = 1, name = "gAddr", 38 @Argument(index = 1, name = "gAddr",
39 - description = "IP Address of the multicast group", 39 + description = "IP Address of the multicast group. '*' can be used to denote all groups",
40 required = true, multiValued = false) 40 required = true, multiValued = false)
41 String gAddr = null; 41 String gAddr = null;
42 42
...@@ -50,6 +50,12 @@ public class McastDeleteCommand extends AbstractShellCommand { ...@@ -50,6 +50,12 @@ public class McastDeleteCommand extends AbstractShellCommand {
50 protected void execute() { 50 protected void execute() {
51 MulticastRouteService mcastRouteManager = get(MulticastRouteService.class); 51 MulticastRouteService mcastRouteManager = get(MulticastRouteService.class);
52 52
53 + if (sAddr.equals("*") && gAddr.equals("*")) {
54 + // Clear all routes
55 + mcastRouteManager.getRoutes().forEach(mcastRouteManager::remove);
56 + return;
57 + }
58 +
53 McastRoute mRoute = new McastRoute(IpAddress.valueOf(sAddr), 59 McastRoute mRoute = new McastRoute(IpAddress.valueOf(sAddr),
54 IpAddress.valueOf(gAddr), McastRoute.Type.STATIC); 60 IpAddress.valueOf(gAddr), McastRoute.Type.STATIC);
55 61
......