alshabib
Committed by Gerrit Code Review

olt sends an event when registering a subscriber

Change-Id: I03ab17513fa15a1a4101e7f4f8622d004d49582a
...@@ -15,17 +15,13 @@ ...@@ -15,17 +15,13 @@
15 */ 15 */
16 package org.onosproject.olt; 16 package org.onosproject.olt;
17 17
18 -import com.google.common.base.Strings;
19 import org.apache.felix.scr.annotations.Activate; 18 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 19 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
22 -import org.apache.felix.scr.annotations.Modified;
23 -import org.apache.felix.scr.annotations.Property;
24 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
25 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
26 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
27 import org.onlab.packet.VlanId; 24 import org.onlab.packet.VlanId;
28 -import org.onlab.util.Tools;
29 import org.onosproject.core.ApplicationId; 25 import org.onosproject.core.ApplicationId;
30 import org.onosproject.core.CoreService; 26 import org.onosproject.core.CoreService;
31 import org.onosproject.event.AbstractListenerManager; 27 import org.onosproject.event.AbstractListenerManager;
...@@ -47,16 +43,21 @@ import org.onosproject.net.flow.TrafficTreatment; ...@@ -47,16 +43,21 @@ import org.onosproject.net.flow.TrafficTreatment;
47 import org.onosproject.net.flowobjective.DefaultForwardingObjective; 43 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
48 import org.onosproject.net.flowobjective.FlowObjectiveService; 44 import org.onosproject.net.flowobjective.FlowObjectiveService;
49 import org.onosproject.net.flowobjective.ForwardingObjective; 45 import org.onosproject.net.flowobjective.ForwardingObjective;
46 +import org.onosproject.net.flowobjective.Objective;
47 +import org.onosproject.net.flowobjective.ObjectiveContext;
48 +import org.onosproject.net.flowobjective.ObjectiveError;
50 import org.onosproject.olt.api.AccessDeviceEvent; 49 import org.onosproject.olt.api.AccessDeviceEvent;
51 import org.onosproject.olt.api.AccessDeviceListener; 50 import org.onosproject.olt.api.AccessDeviceListener;
52 -import org.osgi.service.component.ComponentContext;
53 import org.slf4j.Logger; 51 import org.slf4j.Logger;
54 52
55 -import java.util.Dictionary;
56 import java.util.Map; 53 import java.util.Map;
57 import java.util.Optional; 54 import java.util.Optional;
55 +import java.util.concurrent.CompletableFuture;
58 import java.util.concurrent.ConcurrentHashMap; 56 import java.util.concurrent.ConcurrentHashMap;
57 +import java.util.concurrent.ExecutorService;
58 +import java.util.concurrent.Executors;
59 59
60 +import static org.onlab.util.Tools.groupedThreads;
60 import static org.slf4j.LoggerFactory.getLogger; 61 import static org.slf4j.LoggerFactory.getLogger;
61 62
62 /** 63 /**
...@@ -86,30 +87,10 @@ public class Olt ...@@ -86,30 +87,10 @@ public class Olt
86 private ApplicationId appId; 87 private ApplicationId appId;
87 88
88 private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0); 89 private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0);
89 - public static final int OFFSET = 200;
90 90
91 - public static final int UPLINK_PORT = 129; 91 + private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
92 - public static final int GFAST_UPLINK_PORT = 100; 92 + groupedThreads("onos/olt-service",
93 - 93 + "olt-installer-%d"));
94 - public static final String OLT_DEVICE = "of:90e2ba82f97791e9";
95 - public static final String GFAST_DEVICE = "of:0011223344551357";
96 -
97 - @Property(name = "uplinkPort", intValue = UPLINK_PORT,
98 - label = "The OLT's uplink port number")
99 - private int uplinkPort = UPLINK_PORT;
100 -
101 - @Property(name = "gfastUplink", intValue = GFAST_UPLINK_PORT,
102 - label = "The OLT's uplink port number")
103 - private int gfastUplink = GFAST_UPLINK_PORT;
104 -
105 - //TODO: replace this information with info comming for net cfg service.
106 - @Property(name = "oltDevice", value = OLT_DEVICE,
107 - label = "The OLT device id")
108 - private String oltDevice = OLT_DEVICE;
109 -
110 - @Property(name = "gfastDevice", value = GFAST_DEVICE,
111 - label = "The gfast device id")
112 - private String gfastDevice = GFAST_DEVICE;
113 94
114 private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>(); 95 private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
115 96
...@@ -156,70 +137,6 @@ public class Olt ...@@ -156,70 +137,6 @@ public class Olt
156 log.info("Stopped"); 137 log.info("Stopped");
157 } 138 }
158 139
159 - @Modified
160 - public void modified(ComponentContext context) {
161 - Dictionary<?, ?> properties = context.getProperties();
162 -
163 - String s = Tools.get(properties, "uplinkPort");
164 - uplinkPort = Strings.isNullOrEmpty(s) ? UPLINK_PORT : Integer.parseInt(s);
165 -
166 - s = Tools.get(properties, "oltDevice");
167 - oltDevice = Strings.isNullOrEmpty(s) ? OLT_DEVICE : s;
168 - }
169 -
170 - private short fetchVlanId(PortNumber port) {
171 - long p = port.toLong() + OFFSET;
172 - if (p > 4095) {
173 - log.warn("Port Number {} exceeds vlan max", port);
174 - return -1;
175 - }
176 - return (short) p;
177 - }
178 -
179 - private void provisionVlanOnPort(String deviceId, int uplinkPort, PortNumber p, short vlanId) {
180 - DeviceId did = DeviceId.deviceId(deviceId);
181 -
182 - TrafficSelector upstream = DefaultTrafficSelector.builder()
183 - .matchVlanId(VlanId.vlanId(vlanId))
184 - .matchInPort(p)
185 - .build();
186 -
187 - TrafficSelector downStream = DefaultTrafficSelector.builder()
188 - .matchVlanId(VlanId.vlanId(vlanId))
189 - .matchInPort(PortNumber.portNumber(uplinkPort))
190 - .build();
191 -
192 - TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
193 - .setOutput(PortNumber.portNumber(uplinkPort))
194 - .build();
195 -
196 - TrafficTreatment downStreamTreatment = DefaultTrafficTreatment.builder()
197 - .setOutput(p)
198 - .build();
199 -
200 -
201 - ForwardingObjective upFwd = DefaultForwardingObjective.builder()
202 - .withFlag(ForwardingObjective.Flag.VERSATILE)
203 - .withPriority(1000)
204 - .makePermanent()
205 - .withSelector(upstream)
206 - .fromApp(appId)
207 - .withTreatment(upstreamTreatment)
208 - .add();
209 -
210 - ForwardingObjective downFwd = DefaultForwardingObjective.builder()
211 - .withFlag(ForwardingObjective.Flag.VERSATILE)
212 - .withPriority(1000)
213 - .makePermanent()
214 - .withSelector(downStream)
215 - .fromApp(appId)
216 - .withTreatment(downStreamTreatment)
217 - .add();
218 -
219 - flowObjectiveService.forward(did, upFwd);
220 - flowObjectiveService.forward(did, downFwd);
221 - }
222 -
223 @Override 140 @Override
224 public void provisionSubscriber(ConnectPoint port, VlanId vlan) { 141 public void provisionSubscriber(ConnectPoint port, VlanId vlan) {
225 AccessDeviceData olt = oltData.get(port.deviceId()); 142 AccessDeviceData olt = oltData.get(port.deviceId());
...@@ -238,6 +155,9 @@ public class Olt ...@@ -238,6 +155,9 @@ public class Olt
238 VlanId subscriberVlan, VlanId deviceVlan, 155 VlanId subscriberVlan, VlanId deviceVlan,
239 Optional<VlanId> defaultVlan) { 156 Optional<VlanId> defaultVlan) {
240 157
158 + CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
159 + CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
160 +
241 TrafficSelector upstream = DefaultTrafficSelector.builder() 161 TrafficSelector upstream = DefaultTrafficSelector.builder()
242 .matchVlanId((defaultVlan.isPresent()) ? defaultVlan.get() : DEFAULT_VLAN) 162 .matchVlanId((defaultVlan.isPresent()) ? defaultVlan.get() : DEFAULT_VLAN)
243 .matchInPort(subscriberPort) 163 .matchInPort(subscriberPort)
...@@ -249,6 +169,7 @@ public class Olt ...@@ -249,6 +169,7 @@ public class Olt
249 .build(); 169 .build();
250 170
251 TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder() 171 TrafficTreatment upstreamTreatment = DefaultTrafficTreatment.builder()
172 + .pushVlan()
252 .setVlanId(subscriberVlan) 173 .setVlanId(subscriberVlan)
253 .pushVlan() 174 .pushVlan()
254 .setVlanId(deviceVlan) 175 .setVlanId(deviceVlan)
...@@ -269,7 +190,17 @@ public class Olt ...@@ -269,7 +190,17 @@ public class Olt
269 .withSelector(upstream) 190 .withSelector(upstream)
270 .fromApp(appId) 191 .fromApp(appId)
271 .withTreatment(upstreamTreatment) 192 .withTreatment(upstreamTreatment)
272 - .add(); 193 + .add(new ObjectiveContext() {
194 + @Override
195 + public void onSuccess(Objective objective) {
196 + upFuture.complete(null);
197 + }
198 +
199 + @Override
200 + public void onError(Objective objective, ObjectiveError error) {
201 + upFuture.complete(error);
202 + }
203 + });
273 204
274 ForwardingObjective downFwd = DefaultForwardingObjective.builder() 205 ForwardingObjective downFwd = DefaultForwardingObjective.builder()
275 .withFlag(ForwardingObjective.Flag.VERSATILE) 206 .withFlag(ForwardingObjective.Flag.VERSATILE)
...@@ -278,10 +209,38 @@ public class Olt ...@@ -278,10 +209,38 @@ public class Olt
278 .withSelector(downstream) 209 .withSelector(downstream)
279 .fromApp(appId) 210 .fromApp(appId)
280 .withTreatment(downstreamTreatment) 211 .withTreatment(downstreamTreatment)
281 - .add(); 212 + .add(new ObjectiveContext() {
213 + @Override
214 + public void onSuccess(Objective objective) {
215 + downFuture.complete(null);
216 + }
217 +
218 + @Override
219 + public void onError(Objective objective, ObjectiveError error) {
220 + downFuture.complete(error);
221 + }
222 + });
282 223
283 flowObjectiveService.forward(deviceId, upFwd); 224 flowObjectiveService.forward(deviceId, upFwd);
284 flowObjectiveService.forward(deviceId, downFwd); 225 flowObjectiveService.forward(deviceId, downFwd);
226 +
227 + upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
228 + if (upStatus == null && downStatus == null) {
229 + post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_REGISTERED,
230 + deviceId,
231 + deviceVlan,
232 + subscriberVlan));
233 + } else if (downStatus != null) {
234 + log.error("Subscriber with vlan {} on device {} " +
235 + "on port {} failed downstream installation: {}",
236 + subscriberVlan, deviceId, subscriberPort, downStatus);
237 + } else if (upStatus != null) {
238 + log.error("Subscriber with vlan {} on device {} " +
239 + "on port {} failed upstream installation: {}",
240 + subscriberVlan, deviceId, subscriberPort, upStatus);
241 + }
242 + }, oltInstallers);
243 +
285 } 244 }
286 245
287 @Override 246 @Override
...@@ -292,17 +251,14 @@ public class Olt ...@@ -292,17 +251,14 @@ public class Olt
292 private class InternalDeviceListener implements DeviceListener { 251 private class InternalDeviceListener implements DeviceListener {
293 @Override 252 @Override
294 public void event(DeviceEvent event) { 253 public void event(DeviceEvent event) {
295 - DeviceId devId = DeviceId.deviceId(oltDevice); 254 + DeviceId devId = event.subject().id();
296 - if (!devId.equals(event.subject().id())) { 255 + if (!oltData.containsKey(devId)) {
256 + log.debug("Device {} is not an OLT", devId);
297 return; 257 return;
298 } 258 }
299 switch (event.type()) { 259 switch (event.type()) {
300 case PORT_ADDED: 260 case PORT_ADDED:
301 case PORT_UPDATED: 261 case PORT_UPDATED:
302 - if (event.port().isEnabled()) {
303 - short vlanId = fetchVlanId(event.port().number());
304 - provisionVlanOnPort(gfastDevice, uplinkPort, event.port().number(), vlanId);
305 - }
306 break; 262 break;
307 case DEVICE_ADDED: 263 case DEVICE_ADDED:
308 post(new AccessDeviceEvent( 264 post(new AccessDeviceEvent(
......