alshabib
Committed by Gerrit Code Review

Added Meter object accounting

Change-Id: I2a3b88ffd1756b738e197943b3e02771f5729e45
...@@ -80,4 +80,11 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> { ...@@ -80,4 +80,11 @@ public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
80 * @param reason a failure reason 80 * @param reason a failure reason
81 */ 81 */
82 void failedMeter(MeterOperation op, MeterFailReason reason); 82 void failedMeter(MeterOperation op, MeterFailReason reason);
83 +
84 + /**
85 + * Delete this meter immediately.
86 + * @param m a meter
87 + */
88 + void deleteMeterNow(Meter m);
89 +
83 } 90 }
......
...@@ -45,6 +45,8 @@ import org.onosproject.store.service.StorageService; ...@@ -45,6 +45,8 @@ import org.onosproject.store.service.StorageService;
45 import org.slf4j.Logger; 45 import org.slf4j.Logger;
46 46
47 import java.util.Collection; 47 import java.util.Collection;
48 +import java.util.Map;
49 +import java.util.stream.Collectors;
48 50
49 import static org.slf4j.LoggerFactory.getLogger; 51 import static org.slf4j.LoggerFactory.getLogger;
50 52
...@@ -170,7 +172,25 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M ...@@ -170,7 +172,25 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
170 172
171 @Override 173 @Override
172 public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) { 174 public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
173 - meterEntries.forEach(m -> store.updateMeterState(m)); 175 + //FIXME: FOLLOWING CODE CANNOT BE TESTED UNTIL SOMETHING THAT
176 + //FIXME: IMPLEMENTS METERS EXISTS
177 + Map<MeterId, Meter> storedMeterMap = store.getAllMeters().stream()
178 + .collect(Collectors.toMap(Meter::id, m -> m));
179 +
180 + meterEntries.stream()
181 + .filter(m -> storedMeterMap.remove(m.id()) != null)
182 + .forEach(m -> store.updateMeterState(m));
183 +
184 + storedMeterMap.values().stream().forEach(m -> {
185 + if (m.state() == MeterState.PENDING_ADD) {
186 + provider().performMeterOperation(m.deviceId(),
187 + new MeterOperation(m,
188 + MeterOperation.Type.ADD,
189 + null));
190 + } else {
191 + store.deleteMeterNow(m);
192 + }
193 + });
174 } 194 }
175 } 195 }
176 196
......
...@@ -198,6 +198,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD ...@@ -198,6 +198,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
198 new MeterData(v.meter(), reason, v.origin())); 198 new MeterData(v.meter(), reason, v.origin()));
199 } 199 }
200 200
201 + @Override
202 + public void deleteMeterNow(Meter m) {
203 + futures.remove(m.id());
204 + meters.remove(m.id());
205 + }
206 +
201 private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> { 207 private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
202 @Override 208 @Override
203 public void event(MapEvent<MeterId, MeterData> event) { 209 public void event(MapEvent<MeterId, MeterData> event) {
...@@ -217,12 +223,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD ...@@ -217,12 +223,12 @@ public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreD
217 } else if (data.reason().isPresent() && local.equals(data.origin())) { 223 } else if (data.reason().isPresent() && local.equals(data.origin())) {
218 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get()); 224 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
219 //TODO: No future -> no friend 225 //TODO: No future -> no friend
220 - futures.remove(data.meter().id()).complete(msr); 226 + futures.get(data.meter().id()).complete(msr);
221 } 227 }
222 break; 228 break;
223 case ADDED: 229 case ADDED:
224 case REMOVED: 230 case REMOVED:
225 - if (local.equals(data.origin())) { 231 + if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
226 futures.remove(data.meter().id()).complete(MeterStoreResult.success()); 232 futures.remove(data.meter().id()).complete(MeterStoreResult.success());
227 } 233 }
228 break; 234 break;
......
...@@ -306,6 +306,9 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -306,6 +306,9 @@ public class OpenFlowControllerImpl implements OpenFlowController {
306 case PORT: 306 case PORT:
307 executorMsgs.submit(new OFMessageHandler(dpid, reply)); 307 executorMsgs.submit(new OFMessageHandler(dpid, reply));
308 break; 308 break;
309 + case METER:
310 + executorMsgs.submit(new OFMessageHandler(dpid, reply));
311 + break;
309 case EXPERIMENTER: 312 case EXPERIMENTER:
310 if (reply instanceof OFCalientFlowStatsReply) { 313 if (reply instanceof OFCalientFlowStatsReply) {
311 // Convert Calient flow statistics to regular flow stats 314 // Convert Calient flow statistics to regular flow stats
...@@ -353,6 +356,7 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -353,6 +356,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
353 } 356 }
354 break; 357 break;
355 default: 358 default:
359 + log.warn("Discarding unknown stats reply type {}", reply.getStatsType());
356 break; 360 break;
357 } 361 }
358 break; 362 break;
......