alshabib
Committed by Gerrit Code Review

fix removal of rules in a distributed setting

Change-Id: I44cb49990b8051f5f1542c11cbda6846049906e3
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
16 package org.onosproject.olt.impl; 16 package org.onosproject.olt.impl;
17 17
18 import com.google.common.collect.Maps; 18 import com.google.common.collect.Maps;
19 -import com.google.common.collect.Sets;
20 import org.apache.felix.scr.annotations.Activate; 19 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component; 20 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate; 21 import org.apache.felix.scr.annotations.Deactivate;
...@@ -61,6 +60,9 @@ import org.onosproject.olt.AccessDeviceData; ...@@ -61,6 +60,9 @@ import org.onosproject.olt.AccessDeviceData;
61 import org.onosproject.olt.AccessDeviceEvent; 60 import org.onosproject.olt.AccessDeviceEvent;
62 import org.onosproject.olt.AccessDeviceListener; 61 import org.onosproject.olt.AccessDeviceListener;
63 import org.onosproject.olt.AccessDeviceService; 62 import org.onosproject.olt.AccessDeviceService;
63 +import org.onosproject.store.serializers.KryoNamespaces;
64 +import org.onosproject.store.service.Serializer;
65 +import org.onosproject.store.service.StorageService;
64 import org.osgi.service.component.ComponentContext; 66 import org.osgi.service.component.ComponentContext;
65 import org.slf4j.Logger; 67 import org.slf4j.Logger;
66 68
...@@ -69,7 +71,6 @@ import java.util.List; ...@@ -69,7 +71,6 @@ import java.util.List;
69 import java.util.Map; 71 import java.util.Map;
70 import java.util.Optional; 72 import java.util.Optional;
71 import java.util.Properties; 73 import java.util.Properties;
72 -import java.util.Set;
73 import java.util.concurrent.CompletableFuture; 74 import java.util.concurrent.CompletableFuture;
74 import java.util.concurrent.ConcurrentHashMap; 75 import java.util.concurrent.ConcurrentHashMap;
75 import java.util.concurrent.ExecutorService; 76 import java.util.concurrent.ExecutorService;
...@@ -90,6 +91,7 @@ public class Olt ...@@ -90,6 +91,7 @@ public class Olt
90 implements AccessDeviceService { 91 implements AccessDeviceService {
91 92
92 private static final short DEFAULT_VLAN = 0; 93 private static final short DEFAULT_VLAN = 0;
94 + private static final String SUBSCRIBERS = "existing-subscribers";
93 95
94 private final Logger log = getLogger(getClass()); 96 private final Logger log = getLogger(getClass());
95 97
...@@ -108,6 +110,8 @@ public class Olt ...@@ -108,6 +110,8 @@ public class Olt
108 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 110 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109 protected ComponentConfigService componentConfigService; 111 protected ComponentConfigService componentConfigService;
110 112
113 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 + protected StorageService storageService;
111 115
112 @Property(name = "defaultVlan", intValue = DEFAULT_VLAN, 116 @Property(name = "defaultVlan", intValue = DEFAULT_VLAN,
113 label = "Default VLAN RG<->ONU traffic") 117 label = "Default VLAN RG<->ONU traffic")
...@@ -123,10 +127,7 @@ public class Olt ...@@ -123,10 +127,7 @@ public class Olt
123 127
124 private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>(); 128 private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
125 129
126 - private Map<ConnectPoint, Set<ForwardingObjective.Builder>> objectives = 130 + private Map<ConnectPoint, VlanId> subscribers;
127 - Maps.newConcurrentMap();
128 -
129 - private Map<ConnectPoint, VlanId> subscribers = Maps.newConcurrentMap();
130 131
131 private InternalNetworkConfigListener configListener = 132 private InternalNetworkConfigListener configListener =
132 new InternalNetworkConfigListener(); 133 new InternalNetworkConfigListener();
...@@ -172,6 +173,11 @@ public class Olt ...@@ -172,6 +173,11 @@ public class Olt
172 .forEach(p -> processFilteringObjectives((DeviceId) p.element().id(), 173 .forEach(p -> processFilteringObjectives((DeviceId) p.element().id(),
173 p.number(), true)); 174 p.number(), true));
174 175
176 + subscribers = storageService.<ConnectPoint, VlanId>consistentMapBuilder()
177 + .withName(SUBSCRIBERS)
178 + .withSerializer(Serializer.using(KryoNamespaces.API))
179 + .build().asJavaMap();
180 +
175 deviceService.addListener(deviceListener); 181 deviceService.addListener(deviceListener);
176 182
177 log.info("Started with Application ID {}", appId.id()); 183 log.info("Started with Application ID {}", appId.id());
...@@ -220,7 +226,15 @@ public class Olt ...@@ -220,7 +226,15 @@ public class Olt
220 return; 226 return;
221 } 227 }
222 228
223 - unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan()); 229 + VlanId subscriberVlan = subscribers.remove(port);
230 +
231 + if (subscriberVlan == null) {
232 + log.warn("Unknown subscriber at location {}", port);
233 + return;
234 + }
235 +
236 + unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), subscriberVlan,
237 + olt.vlan(), olt.defaultVlan());
224 238
225 } 239 }
226 240
...@@ -230,39 +244,43 @@ public class Olt ...@@ -230,39 +244,43 @@ public class Olt
230 } 244 }
231 245
232 private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink, 246 private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
233 - PortNumber subscriberPort, VlanId deviceVlan) { 247 + PortNumber subscriberPort, VlanId subscriberVlan,
234 - 248 + VlanId deviceVlan, Optional<VlanId> defaultVlan) {
235 - //FIXME: This method is slightly ugly but it'll do until we have a better
236 - // way to remove flows from the flow store.
237 249
238 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture(); 250 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
239 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture(); 251 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
240 252
241 - ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort); 253 + ForwardingObjective.Builder upFwd = upBuilder(uplink, subscriberPort,
242 - 254 + subscriberVlan, deviceVlan,
243 - VlanId subscriberVlan = subscribers.remove(cp); 255 + defaultVlan);
256 + ForwardingObjective.Builder downFwd = downBuilder(uplink, subscriberPort,
257 + subscriberVlan, deviceVlan,
258 + defaultVlan);
244 259
245 - Set<ForwardingObjective.Builder> fwds = objectives.remove(cp);
246 260
247 - if (fwds == null || fwds.size() != 2) { 261 + flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
248 - log.warn("Unknown or incomplete subscriber at {}", cp); 262 + @Override
249 - return; 263 + public void onSuccess(Objective objective) {
250 - } 264 + upFuture.complete(null);
265 + }
251 266
267 + @Override
268 + public void onError(Objective objective, ObjectiveError error) {
269 + upFuture.complete(error);
270 + }
271 + }));
252 272
253 - fwds.stream().forEach( 273 + flowObjectiveService.forward(deviceId, downFwd.remove(new ObjectiveContext() {
254 - fwd -> flowObjectiveService.forward(deviceId, 274 + @Override
255 - fwd.remove(new ObjectiveContext() { 275 + public void onSuccess(Objective objective) {
256 - @Override 276 + downFuture.complete(null);
257 - public void onSuccess(Objective objective) { 277 + }
258 - upFuture.complete(null);
259 - }
260 278
261 - @Override 279 + @Override
262 - public void onError(Objective objective, ObjectiveError error) { 280 + public void onError(Objective objective, ObjectiveError error) {
263 - upFuture.complete(error); 281 + downFuture.complete(error);
264 - } 282 + }
265 - }))); 283 + }));
266 284
267 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> { 285 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
268 if (upStatus == null && downStatus == null) { 286 if (upStatus == null && downStatus == null) {
...@@ -291,53 +309,17 @@ public class Olt ...@@ -291,53 +309,17 @@ public class Olt
291 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture(); 309 CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
292 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture(); 310 CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
293 311
294 - TrafficSelector upstream = DefaultTrafficSelector.builder() 312 + ForwardingObjective.Builder upFwd = upBuilder(uplinkPort, subscriberPort,
295 - .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan))) 313 + subscriberVlan, deviceVlan,
296 - .matchInPort(subscriberPort) 314 + defaultVlan);
297 - .build();
298 -
299 - TrafficSelector downstream = DefaultTrafficSelector.builder()
300 - .matchVlanId(deviceVlan)
301 - .matchInPort(uplinkPort)
302 - .matchInnerVlanId(subscriberVlan)
303 - .build();
304 -
305 - TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
306 - .pushVlan()
307 - .setVlanId(subscriberVlan)
308 - .pushVlan()
309 - .setVlanId(deviceVlan)
310 - .setOutput(uplinkPort)
311 - .build();
312 315
313 - TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
314 - .popVlan()
315 - .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
316 - .setOutput(subscriberPort)
317 - .build();
318 316
319 - 317 + ForwardingObjective.Builder downFwd = downBuilder(uplinkPort, subscriberPort,
320 - ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder() 318 + subscriberVlan, deviceVlan,
321 - .withFlag(ForwardingObjective.Flag.VERSATILE) 319 + defaultVlan);
322 - .withPriority(1000)
323 - .makePermanent()
324 - .withSelector(upstream)
325 - .fromApp(appId)
326 - .withTreatment(upstreamTreatment);
327 -
328 -
329 - ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder()
330 - .withFlag(ForwardingObjective.Flag.VERSATILE)
331 - .withPriority(1000)
332 - .makePermanent()
333 - .withSelector(downstream)
334 - .fromApp(appId)
335 - .withTreatment(downstreamTreatment);
336 320
337 ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort); 321 ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
338 -
339 subscribers.put(cp, subscriberVlan); 322 subscribers.put(cp, subscriberVlan);
340 - objectives.put(cp, Sets.newHashSet(upFwd, downFwd));
341 323
342 flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() { 324 flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
343 @Override 325 @Override
...@@ -383,6 +365,60 @@ public class Olt ...@@ -383,6 +365,60 @@ public class Olt
383 365
384 } 366 }
385 367
368 + private ForwardingObjective.Builder downBuilder(PortNumber uplinkPort,
369 + PortNumber subscriberPort,
370 + VlanId subscriberVlan,
371 + VlanId deviceVlan,
372 + Optional<VlanId> defaultVlan) {
373 + TrafficSelector downstream = DefaultTrafficSelector.builder()
374 + .matchVlanId(deviceVlan)
375 + .matchInPort(uplinkPort)
376 + .matchInnerVlanId(subscriberVlan)
377 + .build();
378 +
379 + TrafficTreatment downstreamTreatment = DefaultTrafficTreatment.builder()
380 + .popVlan()
381 + .setVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
382 + .setOutput(subscriberPort)
383 + .build();
384 +
385 + return DefaultForwardingObjective.builder()
386 + .withFlag(ForwardingObjective.Flag.VERSATILE)
387 + .withPriority(1000)
388 + .makePermanent()
389 + .withSelector(downstream)
390 + .fromApp(appId)
391 + .withTreatment(downstreamTreatment);
392 + }
393 +
394 + private ForwardingObjective.Builder upBuilder(PortNumber uplinkPort,
395 + PortNumber subscriberPort,
396 + VlanId subscriberVlan,
397 + VlanId deviceVlan,
398 + Optional<VlanId> defaultVlan) {
399 + TrafficSelector upstream = DefaultTrafficSelector.builder()
400 + .matchVlanId(defaultVlan.orElse(VlanId.vlanId((short) this.defaultVlan)))
401 + .matchInPort(subscriberPort)
402 + .build();
403 +
404 +
405 + TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
406 + .pushVlan()
407 + .setVlanId(subscriberVlan)
408 + .pushVlan()
409 + .setVlanId(deviceVlan)
410 + .setOutput(uplinkPort)
411 + .build();
412 +
413 + return DefaultForwardingObjective.builder()
414 + .withFlag(ForwardingObjective.Flag.VERSATILE)
415 + .withPriority(1000)
416 + .makePermanent()
417 + .withSelector(upstream)
418 + .fromApp(appId)
419 + .withTreatment(upstreamTreatment);
420 + }
421 +
386 private void processFilteringObjectives(DeviceId devId, PortNumber port, boolean install) { 422 private void processFilteringObjectives(DeviceId devId, PortNumber port, boolean install) {
387 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder(); 423 DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
388 424
...@@ -430,9 +466,11 @@ public class Olt ...@@ -430,9 +466,11 @@ public class Olt
430 break; 466 break;
431 case PORT_REMOVED: 467 case PORT_REMOVED:
432 AccessDeviceData olt = oltData.get(devId); 468 AccessDeviceData olt = oltData.get(devId);
469 + VlanId vlan = subscribers.get(new ConnectPoint(devId,
470 + event.port().number()));
433 unprovisionSubscriber(devId, olt.uplink(), 471 unprovisionSubscriber(devId, olt.uplink(),
434 event.port().number(), 472 event.port().number(),
435 - olt.vlan()); 473 + vlan, olt.vlan(), olt.defaultVlan());
436 if (!oltData.get(devId).uplink().equals(event.port().number()) && 474 if (!oltData.get(devId).uplink().equals(event.port().number()) &&
437 event.port().isEnabled()) { 475 event.port().isEnabled()) {
438 processFilteringObjectives(devId, event.port().number(), false); 476 processFilteringObjectives(devId, event.port().number(), false);
......
...@@ -104,6 +104,9 @@ public interface DatabaseState<K, V> { ...@@ -104,6 +104,9 @@ public interface DatabaseState<K, V> {
104 Long counterGet(String counterName); 104 Long counterGet(String counterName);
105 105
106 @Command 106 @Command
107 + void counterSet(String counterName, long value);
108 +
109 + @Command
107 CommitResponse prepareAndCommit(Transaction transaction); 110 CommitResponse prepareAndCommit(Transaction transaction);
108 111
109 @Command 112 @Command
......
...@@ -207,6 +207,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -207,6 +207,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
207 } 207 }
208 208
209 @Override 209 @Override
210 + public void counterSet(String counterName, long value) {
211 + getCounter(counterName).set(value);
212 + }
213 +
214 + @Override
210 public Long queueSize(String queueName) { 215 public Long queueSize(String queueName) {
211 return Long.valueOf(getQueue(queueName).size()); 216 return Long.valueOf(getQueue(queueName).size());
212 } 217 }
......
...@@ -38,6 +38,7 @@ import org.onosproject.net.driver.AbstractHandlerBehaviour; ...@@ -38,6 +38,7 @@ import org.onosproject.net.driver.AbstractHandlerBehaviour;
38 import org.onosproject.net.flow.DefaultFlowRule; 38 import org.onosproject.net.flow.DefaultFlowRule;
39 import org.onosproject.net.flow.DefaultTrafficSelector; 39 import org.onosproject.net.flow.DefaultTrafficSelector;
40 import org.onosproject.net.flow.DefaultTrafficTreatment; 40 import org.onosproject.net.flow.DefaultTrafficTreatment;
41 +import org.onosproject.net.flow.FlowEntry;
41 import org.onosproject.net.flow.FlowRule; 42 import org.onosproject.net.flow.FlowRule;
42 import org.onosproject.net.flow.FlowRuleOperations; 43 import org.onosproject.net.flow.FlowRuleOperations;
43 import org.onosproject.net.flow.FlowRuleOperationsContext; 44 import org.onosproject.net.flow.FlowRuleOperationsContext;
...@@ -592,7 +593,12 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner { ...@@ -592,7 +593,12 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
592 builder.add(inner.build()).add(outer.build()); 593 builder.add(inner.build()).add(outer.build());
593 break; 594 break;
594 case REMOVE: 595 case REMOVE:
595 - builder.remove(inner.build()).remove(outer.build()); 596 + Iterable<FlowEntry> flows = flowRuleService.getFlowEntries(deviceId);
597 + for (FlowEntry fe : flows) {
598 + if (fe.equals(inner.build()) || fe.equals(outer.build())) {
599 + builder.remove(fe);
600 + }
601 + }
596 break; 602 break;
597 case ADD_TO_EXISTING: 603 case ADD_TO_EXISTING:
598 break; 604 break;
......