Yuta HIGUCHI

For ONOS-356

- Add current InstallationFuture information on TimeoutException
- Set timeout values propotional to batch size
- Fix for ConcurrentModificationException
- Check if src/dst is part of the graph before path computation

Change-Id: Iabeac7939c52502b83bf9ebcbe2023539de3ae99
...@@ -132,7 +132,7 @@ public class IntentPushTestCommand extends AbstractShellCommand ...@@ -132,7 +132,7 @@ public class IntentPushTestCommand extends AbstractShellCommand
132 start = System.currentTimeMillis(); 132 start = System.currentTimeMillis();
133 service.execute(ops); 133 service.execute(ops);
134 try { 134 try {
135 - if (latch.await(30, TimeUnit.SECONDS)) { 135 + if (latch.await(100 + count * 200, TimeUnit.MILLISECONDS)) {
136 printResults(count); 136 printResults(count);
137 } else { 137 } else {
138 print("Failure: %d intents not installed", latch.getCount()); 138 print("Failure: %d intents not installed", latch.getCount());
......
...@@ -17,9 +17,9 @@ package org.onlab.onos.net.flow; ...@@ -17,9 +17,9 @@ package org.onlab.onos.net.flow;
17 17
18 18
19 import java.util.Collections; 19 import java.util.Collections;
20 -
21 import java.util.Set; 20 import java.util.Set;
22 21
22 +import com.google.common.base.MoreObjects;
23 import com.google.common.collect.ImmutableSet; 23 import com.google.common.collect.ImmutableSet;
24 24
25 /** 25 /**
...@@ -73,4 +73,12 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> { ...@@ -73,4 +73,12 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
73 return failedIds; 73 return failedIds;
74 } 74 }
75 75
76 + @Override
77 + public String toString() {
78 + return MoreObjects.toStringHelper(getClass())
79 + .add("success?", success)
80 + .add("failedItems", failures)
81 + .add("failedIds", failedIds)
82 + .toString();
83 + }
76 } 84 }
......
...@@ -67,7 +67,6 @@ import java.util.concurrent.Future; ...@@ -67,7 +67,6 @@ import java.util.concurrent.Future;
67 import java.util.concurrent.TimeUnit; 67 import java.util.concurrent.TimeUnit;
68 import java.util.concurrent.TimeoutException; 68 import java.util.concurrent.TimeoutException;
69 import java.util.concurrent.atomic.AtomicReference; 69 import java.util.concurrent.atomic.AtomicReference;
70 -
71 import static com.google.common.base.Preconditions.checkNotNull; 70 import static com.google.common.base.Preconditions.checkNotNull;
72 import static org.onlab.util.Tools.namedThreads; 71 import static org.onlab.util.Tools.namedThreads;
73 import static org.slf4j.LoggerFactory.getLogger; 72 import static org.slf4j.LoggerFactory.getLogger;
...@@ -232,7 +231,7 @@ public class FlowRuleManager ...@@ -232,7 +231,7 @@ public class FlowRuleManager
232 lastSeen.remove(flowEntry); 231 lastSeen.remove(flowEntry);
233 FlowEntry stored = store.getFlowEntry(flowEntry); 232 FlowEntry stored = store.getFlowEntry(flowEntry);
234 if (stored == null) { 233 if (stored == null) {
235 - log.info("Rule already evicted from store: {}", flowEntry); 234 + log.debug("Rule already evicted from store: {}", flowEntry);
236 return; 235 return;
237 } 236 }
238 Device device = deviceService.getDevice(flowEntry.deviceId()); 237 Device device = deviceService.getDevice(flowEntry.deviceId());
...@@ -378,7 +377,8 @@ public class FlowRuleManager ...@@ -378,7 +377,8 @@ public class FlowRuleManager
378 // Store delegate to re-post events emitted from the store. 377 // Store delegate to re-post events emitted from the store.
379 private class InternalStoreDelegate implements FlowRuleStoreDelegate { 378 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
380 379
381 - private static final int TIMEOUT = 5000; // ms 380 + // FIXME set appropriate default and make it configurable
381 + private static final int TIMEOUT_PER_OP = 500; // ms
382 382
383 // TODO: Right now we only dispatch events at individual flowEntry level. 383 // TODO: Right now we only dispatch events at individual flowEntry level.
384 // It may be more efficient for also dispatch events as a batch. 384 // It may be more efficient for also dispatch events as a batch.
...@@ -407,7 +407,7 @@ public class FlowRuleManager ...@@ -407,7 +407,7 @@ public class FlowRuleManager
407 public void run() { 407 public void run() {
408 CompletedBatchOperation res; 408 CompletedBatchOperation res;
409 try { 409 try {
410 - res = result.get(TIMEOUT, TimeUnit.MILLISECONDS); 410 + res = result.get(TIMEOUT_PER_OP * batchOperation.size(), TimeUnit.MILLISECONDS);
411 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res)); 411 store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
412 } catch (TimeoutException | InterruptedException | ExecutionException e) { 412 } catch (TimeoutException | InterruptedException | ExecutionException e) {
413 log.warn("Something went wrong with the batch operation {}", 413 log.warn("Something went wrong with the batch operation {}",
...@@ -457,6 +457,10 @@ public class FlowRuleManager ...@@ -457,6 +457,10 @@ public class FlowRuleManager
457 if (state.get() == BatchState.FINISHED) { 457 if (state.get() == BatchState.FINISHED) {
458 return false; 458 return false;
459 } 459 }
460 + if (log.isDebugEnabled()) {
461 + log.debug("Cancelling FlowRuleBatchFuture",
462 + new RuntimeException("Just printing backtrace"));
463 + }
460 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) { 464 if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
461 return false; 465 return false;
462 } 466 }
...@@ -526,6 +530,7 @@ public class FlowRuleManager ...@@ -526,6 +530,7 @@ public class FlowRuleManager
526 throw new CancellationException(); 530 throw new CancellationException();
527 } 531 }
528 if (!completed.isSuccess()) { 532 if (!completed.isSuccess()) {
533 + log.warn("FlowRuleBatch failed: {}", completed);
529 failed.addAll(completed.failedItems()); 534 failed.addAll(completed.failedItems());
530 failedIds.addAll(completed.failedIds()); 535 failedIds.addAll(completed.failedIds());
531 cleanUpBatch(); 536 cleanUpBatch();
...@@ -557,6 +562,8 @@ public class FlowRuleManager ...@@ -557,6 +562,8 @@ public class FlowRuleManager
557 } 562 }
558 563
559 private void cleanUpBatch() { 564 private void cleanUpBatch() {
565 + log.debug("cleaning up batch");
566 + // TODO convert these into a batch?
560 for (FlowRuleBatchEntry fbe : batches.values()) { 567 for (FlowRuleBatchEntry fbe : batches.values()) {
561 if (fbe.getOperator() == FlowRuleOperation.ADD || 568 if (fbe.getOperator() == FlowRuleOperation.ADD ||
562 fbe.getOperator() == FlowRuleOperation.MODIFY) { 569 fbe.getOperator() == FlowRuleOperation.MODIFY) {
......
...@@ -734,7 +734,8 @@ public class IntentManager ...@@ -734,7 +734,8 @@ public class IntentManager
734 734
735 private class IntentInstallMonitor implements Runnable { 735 private class IntentInstallMonitor implements Runnable {
736 736
737 - private static final long TIMEOUT = 5000; // ms 737 + // TODO make this configurable
738 + private static final int TIMEOUT_PER_OP = 500; // ms
738 private static final int MAX_ATTEMPTS = 3; 739 private static final int MAX_ATTEMPTS = 3;
739 740
740 private final IntentOperations ops; 741 private final IntentOperations ops;
...@@ -742,11 +743,18 @@ public class IntentManager ...@@ -742,11 +743,18 @@ public class IntentManager
742 743
743 private Future<CompletedBatchOperation> future; 744 private Future<CompletedBatchOperation> future;
744 private long startTime = System.currentTimeMillis(); 745 private long startTime = System.currentTimeMillis();
745 - private long endTime = startTime + TIMEOUT; 746 + private long endTime;
746 private int installAttempt; 747 private int installAttempt;
747 748
748 public IntentInstallMonitor(IntentOperations ops) { 749 public IntentInstallMonitor(IntentOperations ops) {
749 this.ops = ops; 750 this.ops = ops;
751 + resetTimeoutLimit();
752 + }
753 +
754 + private void resetTimeoutLimit() {
755 + // FIXME compute reasonable timeouts
756 + this.endTime = System.currentTimeMillis()
757 + + ops.operations().size() * TIMEOUT_PER_OP;
750 } 758 }
751 759
752 private void buildIntentUpdates() { 760 private void buildIntentUpdates() {
...@@ -805,6 +813,7 @@ public class IntentManager ...@@ -805,6 +813,7 @@ public class IntentManager
805 */ 813 */
806 private void processFutures() { 814 private void processFutures() {
807 if (future == null) { 815 if (future == null) {
816 + log.warn("I have no Future.");
808 return; //FIXME look at this 817 return; //FIXME look at this
809 } 818 }
810 try { 819 try {
...@@ -818,9 +827,10 @@ public class IntentManager ...@@ -818,9 +827,10 @@ public class IntentManager
818 } 827 }
819 828
820 private void retry() { 829 private void retry() {
830 + log.debug("Execution timed out, retrying.");
821 if (future.cancel(true)) { // cancel success; batch is reverted 831 if (future.cancel(true)) { // cancel success; batch is reverted
822 // reset the timer 832 // reset the timer
823 - endTime = System.currentTimeMillis() + TIMEOUT; 833 + resetTimeoutLimit();
824 if (installAttempt++ >= MAX_ATTEMPTS) { 834 if (installAttempt++ >= MAX_ATTEMPTS) {
825 log.warn("Install request timed out: {}", ops); 835 log.warn("Install request timed out: {}", ops);
826 for (IntentUpdate update : intentUpdates) { 836 for (IntentUpdate update : intentUpdates) {
......
...@@ -169,9 +169,11 @@ public class ObjectiveTracker implements ObjectiveTrackerService { ...@@ -169,9 +169,11 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
169 linkEvent.subject().isDurable())) { 169 linkEvent.subject().isDurable())) {
170 final LinkKey linkKey = linkKey(linkEvent.subject()); 170 final LinkKey linkKey = linkKey(linkEvent.subject());
171 Set<IntentId> intentIds = intentsByLink.get(linkKey); 171 Set<IntentId> intentIds = intentsByLink.get(linkKey);
172 + synchronized (intentsByLink) {
172 log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds); 173 log.debug("recompile triggered by LinkDown {} {}", linkKey, intentIds);
173 toBeRecompiled.addAll(intentIds); 174 toBeRecompiled.addAll(intentIds);
174 } 175 }
176 + }
175 recompileOnly = recompileOnly && 177 recompileOnly = recompileOnly &&
176 (linkEvent.type() == LINK_REMOVED || 178 (linkEvent.type() == LINK_REMOVED ||
177 (linkEvent.type() == LINK_UPDATED && 179 (linkEvent.type() == LINK_UPDATED &&
......
...@@ -262,9 +262,16 @@ public class DefaultTopology extends AbstractModel implements Topology { ...@@ -262,9 +262,16 @@ public class DefaultTopology extends AbstractModel implements Topology {
262 * @return set of shortest paths 262 * @return set of shortest paths
263 */ 263 */
264 Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) { 264 Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) {
265 + final DefaultTopologyVertex srcV = new DefaultTopologyVertex(src);
266 + final DefaultTopologyVertex dstV = new DefaultTopologyVertex(dst);
267 + Set<TopologyVertex> vertices = graph.getVertexes();
268 + if (!vertices.contains(srcV) || !vertices.contains(dstV)) {
269 + // src or dst not part of the current graph
270 + return ImmutableSet.of();
271 + }
272 +
265 GraphPathSearch.Result<TopologyVertex, TopologyEdge> result = 273 GraphPathSearch.Result<TopologyVertex, TopologyEdge> result =
266 - DIJKSTRA.search(graph, new DefaultTopologyVertex(src), 274 + DIJKSTRA.search(graph, srcV, dstV, weight);
267 - new DefaultTopologyVertex(dst), weight);
268 ImmutableSet.Builder<Path> builder = ImmutableSet.builder(); 275 ImmutableSet.Builder<Path> builder = ImmutableSet.builder();
269 for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) { 276 for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) {
270 builder.add(networkPath(path)); 277 builder.add(networkPath(path));
......
...@@ -262,9 +262,16 @@ public class DefaultTopology extends AbstractModel implements Topology { ...@@ -262,9 +262,16 @@ public class DefaultTopology extends AbstractModel implements Topology {
262 * @return set of shortest paths 262 * @return set of shortest paths
263 */ 263 */
264 Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) { 264 Set<Path> getPaths(DeviceId src, DeviceId dst, LinkWeight weight) {
265 + final DefaultTopologyVertex srcV = new DefaultTopologyVertex(src);
266 + final DefaultTopologyVertex dstV = new DefaultTopologyVertex(dst);
267 + Set<TopologyVertex> vertices = graph.getVertexes();
268 + if (!vertices.contains(srcV) || !vertices.contains(dstV)) {
269 + // src or dst not part of the current graph
270 + return ImmutableSet.of();
271 + }
272 +
265 GraphPathSearch.Result<TopologyVertex, TopologyEdge> result = 273 GraphPathSearch.Result<TopologyVertex, TopologyEdge> result =
266 - DIJKSTRA.search(graph, new DefaultTopologyVertex(src), 274 + DIJKSTRA.search(graph, srcV, dstV, weight);
267 - new DefaultTopologyVertex(dst), weight);
268 ImmutableSet.Builder<Path> builder = ImmutableSet.builder(); 275 ImmutableSet.Builder<Path> builder = ImmutableSet.builder();
269 for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) { 276 for (org.onlab.graph.Path<TopologyVertex, TopologyEdge> path : result.paths()) {
270 builder.add(networkPath(path)); 277 builder.add(networkPath(path));
......
...@@ -15,10 +15,12 @@ ...@@ -15,10 +15,12 @@
15 */ 15 */
16 package org.onlab.onos.provider.of.flow.impl; 16 package org.onlab.onos.provider.of.flow.impl;
17 17
18 +import com.google.common.base.MoreObjects;
18 import com.google.common.collect.ArrayListMultimap; 19 import com.google.common.collect.ArrayListMultimap;
19 import com.google.common.collect.Maps; 20 import com.google.common.collect.Maps;
20 import com.google.common.collect.Multimap; 21 import com.google.common.collect.Multimap;
21 import com.google.common.collect.Sets; 22 import com.google.common.collect.Sets;
23 +
22 import org.apache.felix.scr.annotations.Activate; 24 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component; 25 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate; 26 import org.apache.felix.scr.annotations.Deactivate;
...@@ -85,7 +87,9 @@ import java.util.concurrent.TimeUnit; ...@@ -85,7 +87,9 @@ import java.util.concurrent.TimeUnit;
85 import java.util.concurrent.TimeoutException; 87 import java.util.concurrent.TimeoutException;
86 import java.util.concurrent.atomic.AtomicBoolean; 88 import java.util.concurrent.atomic.AtomicBoolean;
87 import java.util.concurrent.atomic.AtomicLong; 89 import java.util.concurrent.atomic.AtomicLong;
90 +import java.util.stream.Collectors;
88 91
92 +import static com.google.common.base.Preconditions.checkState;
89 import static org.slf4j.LoggerFactory.getLogger; 93 import static org.slf4j.LoggerFactory.getLogger;
90 94
91 /** 95 /**
...@@ -191,9 +195,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -191,9 +195,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
191 195
192 @Override 196 @Override
193 public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) { 197 public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
194 - final Set<Dpid> sws = 198 + final Set<Dpid> sws = Sets.newConcurrentHashSet();
195 - Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>()); 199 + final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<>();
196 - final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
197 /* 200 /*
198 * Use identity hash map for reference equality as we could have equal 201 * Use identity hash map for reference equality as we could have equal
199 * flow mods for different switches. 202 * flow mods for different switches.
...@@ -201,7 +204,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -201,7 +204,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
201 Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap(); 204 Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
202 for (FlowRuleBatchEntry fbe : batch.getOperations()) { 205 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
203 FlowRule flowRule = fbe.getTarget(); 206 FlowRule flowRule = fbe.getTarget();
204 - OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri())); 207 + final Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
208 + OpenFlowSwitch sw = controller.getSwitch(dpid);
205 if (sw == null) { 209 if (sw == null) {
206 /* 210 /*
207 * if a switch we are supposed to install to is gone then 211 * if a switch we are supposed to install to is gone then
...@@ -212,8 +216,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -212,8 +216,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
212 failed.cancel(true); 216 failed.cancel(true);
213 return failed; 217 return failed;
214 } 218 }
215 - sws.add(new Dpid(sw.getId())); 219 + sws.add(dpid);
216 - Long flowModXid = xidCounter.getAndIncrement(); 220 + final Long flowModXid = xidCounter.getAndIncrement();
217 FlowModBuilder builder = 221 FlowModBuilder builder =
218 FlowModBuilder.builder(flowRule, sw.factory(), 222 FlowModBuilder.builder(flowRule, sw.factory(),
219 Optional.of(flowModXid)); 223 Optional.of(flowModXid));
...@@ -302,12 +306,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -302,12 +306,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
302 future = pendingFutures.get(msg.getXid()); 306 future = pendingFutures.get(msg.getXid());
303 if (future != null) { 307 if (future != null) {
304 future.satisfyRequirement(dpid); 308 future.satisfyRequirement(dpid);
309 + } else {
310 + log.warn("Received unknown Barrier Reply: {}", msg.getXid());
305 } 311 }
306 break; 312 break;
307 case ERROR: 313 case ERROR:
314 + log.warn("received Error message {} from {}", msg, dpid);
308 future = pendingFMs.get(msg.getXid()); 315 future = pendingFMs.get(msg.getXid());
309 if (future != null) { 316 if (future != null) {
310 future.fail((OFErrorMsg) msg, dpid); 317 future.fail((OFErrorMsg) msg, dpid);
318 + } else {
319 + log.warn("Received unknown Error Reply: {} {}", msg.getXid(), msg);
311 } 320 }
312 break; 321 break;
313 default: 322 default:
...@@ -369,13 +378,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -369,13 +378,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
369 378
370 private class InstallationFuture implements Future<CompletedBatchOperation> { 379 private class InstallationFuture implements Future<CompletedBatchOperation> {
371 380
381 + // barrier xid
372 private final Long xid; 382 private final Long xid;
383 + // waiting for barrier reply from...
373 private final Set<Dpid> sws; 384 private final Set<Dpid> sws;
374 private final AtomicBoolean ok = new AtomicBoolean(true); 385 private final AtomicBoolean ok = new AtomicBoolean(true);
386 + // FlowMod xid ->
375 private final Map<Long, FlowRuleBatchEntry> fms; 387 private final Map<Long, FlowRuleBatchEntry> fms;
376 388
377 389
378 private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet(); 390 private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
391 + // Failed batch operation id
379 private Long failedId; 392 private Long failedId;
380 393
381 private final CountDownLatch countDownLatch; 394 private final CountDownLatch countDownLatch;
...@@ -456,6 +469,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -456,6 +469,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
456 469
457 470
458 public void verify() { 471 public void verify() {
472 + checkState(!sws.isEmpty());
459 for (Dpid dpid : sws) { 473 for (Dpid dpid : sws) {
460 OpenFlowSwitch sw = controller.getSwitch(dpid); 474 OpenFlowSwitch sw = controller.getSwitch(dpid);
461 OFBarrierRequest.Builder builder = sw.factory() 475 OFBarrierRequest.Builder builder = sw.factory()
...@@ -520,7 +534,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -520,7 +534,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
520 cleanUp(); 534 cleanUp();
521 return result; 535 return result;
522 } 536 }
523 - throw new TimeoutException(); 537 + throw new TimeoutException(this.toString());
524 } 538 }
525 539
526 private void cleanUp() { 540 private void cleanUp() {
...@@ -538,6 +552,22 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -538,6 +552,22 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
538 //FIXME don't do cleanup here (moved by BOC) 552 //FIXME don't do cleanup here (moved by BOC)
539 //cleanUp(); 553 //cleanUp();
540 } 554 }
555 +
556 + @Override
557 + public String toString() {
558 + return MoreObjects.toStringHelper(getClass())
559 + .add("xid", xid)
560 + .add("pending devices", sws)
561 + .add("devices in batch",
562 + fms.values().stream()
563 + .map((fbe) -> fbe.getTarget().deviceId())
564 + .distinct().collect(Collectors.toList()))
565 + .add("failedId", failedId)
566 + .add("latchCount", countDownLatch.getCount())
567 + .add("state", state)
568 + .add("no error?", ok.get())
569 + .toString();
570 + }
541 } 571 }
542 572
543 } 573 }
......