Brian O'Connor

Updating IntentCleanup to check for stalled *_REQ and *ING intents.

Change-Id: Ibe06ee99463bb8230acf9751da4fb1012859b0ea
......@@ -43,10 +43,11 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* Returns an iterable of all intent data objects in the store.
*
* @param localOnly should only intents for which this instance is master
* should be returned
* be returned
* @param olderThan specified duration in milliseconds (0 for "now")
* @return iterable of all intent data objects
*/
Iterable<IntentData> getIntentData(boolean localOnly);
Iterable<IntentData> getIntentData(boolean localOnly, long olderThan);
/**
* Returns the state of the specified intent.
......@@ -119,4 +120,22 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @return pending intents
*/
Iterable<Intent> getPending();
/**
* Returns the intent data objects that are pending processing.
*
* @return pending intent data objects
*/
Iterable<IntentData> getPendingData();
/**
* Returns the intent data objects that are pending processing for longer
* than the specified duration.
*
* @param localOnly should only intents for which this instance is master
* be returned
* @param olderThan specified duration in milliseconds (0 for "now")
* @return pending intent data objects
*/
Iterable<IntentData> getPendingData(boolean localOnly, long olderThan);
}
......
......@@ -28,6 +28,7 @@ import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.Key;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
......@@ -41,12 +42,16 @@ import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.intent.IntentState.CORRUPT;
import static org.slf4j.LoggerFactory.getLogger;
/**
* FIXME Class to cleanup Intents in CORRUPT state.
* FIXME move this to its own file eventually (but need executor for now)
* This component cleans up intents that have encountered errors or otherwise
* stalled during installation or withdrawal.
* <p>
* It periodically polls (based on configured period) for pending and CORRUPT
* intents from the store and retries. It also listens for CORRUPT event
* notifications, which signify errors in processing, and retries.
* </p>
*/
@Component(immediate = true)
public class IntentCleanup implements Runnable, IntentListener {
......@@ -58,6 +63,7 @@ public class IntentCleanup implements Runnable, IntentListener {
@Property(name = "period", intValue = DEFAULT_PERIOD,
label = "Frequency in ms between cleanup runs")
protected int period = DEFAULT_PERIOD;
private long periodMs;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService service;
......@@ -126,7 +132,7 @@ public class IntentCleanup implements Runnable, IntentListener {
}
};
long periodMs = period * 1000; //convert to ms
periodMs = period * 1_000; //convert to ms
timer.scheduleAtFixedRate(timerTask, periodMs, periodMs);
}
......@@ -140,41 +146,79 @@ public class IntentCleanup implements Runnable, IntentListener {
}
}
/**
* Iterate through CORRUPT intents and re-submit/withdraw.
*
* FIXME we want to eventually count number of retries per intent and give up
* FIXME we probably also want to look at intents that have been stuck
* in *_REQ or *ING for "too long".
*/
private void cleanup() {
int count = 0;
for (IntentData intentData : store.getIntentData(true)) {
if (intentData.state() == CORRUPT) {
private void resubmitCorrupt(IntentData intentData, boolean checkThreshold) {
//TODO we might want to give up when retry count exceeds a threshold
// FIXME drop this if we exceed retry threshold
switch (intentData.request()) {
case INSTALL_REQ:
service.submit(intentData.intent());
break;
case WITHDRAW_REQ:
service.withdraw(intentData.intent());
break;
default:
//TODO this is an error, might want to log it
break;
}
}
private void resubmitPendingRequest(IntentData intentData) {
switch (intentData.request()) {
case INSTALL_REQ:
service.submit(intentData.intent());
count++;
break;
case WITHDRAW_REQ:
service.withdraw(intentData.intent());
count++;
break;
default:
//TODO this is an error
//TODO this is an error (or could be purge), might want to log it
break;
}
}
/**
* Iterate through CORRUPT intents and re-submit/withdraw appropriately.
*
*/
private void cleanup() {
int corruptCount = 0, stuckCount = 0, pendingCount = 0;
for (IntentData intentData : store.getIntentData(true, periodMs)) {
switch (intentData.state()) {
case CORRUPT:
resubmitCorrupt(intentData, false);
corruptCount++;
case INSTALLING: //FALLTHROUGH
case WITHDRAWING:
resubmitPendingRequest(intentData);
stuckCount++;
default:
//NOOP
break;
}
}
for (IntentData intentData : store.getPendingData(true, periodMs)) {
//TODO should we do age check here, or in the store?
resubmitPendingRequest(intentData);
stuckCount++;
}
log.debug("Intent cleanup ran and resubmitted {} intents", count);
log.debug("Intent cleanup ran and resubmitted {} corrupt, {} stuck, and {} pending intents",
corruptCount, stuckCount, pendingCount);
}
@Override
public void event(IntentEvent event) {
// fast path for CORRUPT intents, retry on event notification
//TODO we might consider using the timer to back off for subsequent retries
if (event.type() == IntentEvent.Type.CORRUPT) {
// FIXME drop this if we exceed retry threshold
// just run the whole cleanup script for now
executor.submit(this);
Key key = event.subject().key();
if (store.isMaster(key)) {
IntentData data = store.getIntentData(event.subject().key());
resubmitCorrupt(data, true);
}
}
}
}
......
......@@ -36,6 +36,10 @@ public class WallClockTimestamp implements Timestamp {
unixTimestamp = System.currentTimeMillis();
}
public WallClockTimestamp(long timestamp) {
unixTimestamp = timestamp;
}
@Override
public int compareTo(Timestamp o) {
checkArgument(o instanceof WallClockTimestamp,
......
......@@ -132,10 +132,13 @@ public class GossipIntentStore
}
@Override
public Iterable<IntentData> getIntentData(boolean localOnly) {
if (localOnly) {
public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
if (localOnly || olderThan > 0) {
long now = System.currentTimeMillis();
final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
return currentMap.values().stream()
.filter(data -> isMaster(data.key()))
.filter(data -> data.version().isOlderThan(time) &&
(!localOnly || isMaster(data.key())))
.collect(Collectors.toList());
}
return currentMap.values();
......@@ -261,6 +264,21 @@ public class GossipIntentStore
.collect(Collectors.toList());
}
@Override
public Iterable<IntentData> getPendingData() {
return pendingMap.values();
}
@Override
public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
long now = System.currentTimeMillis();
final WallClockTimestamp time = new WallClockTimestamp(now - olderThan);
return pendingMap.values().stream()
.filter(data -> data.version().isOlderThan(time) &&
(!localOnly || isMaster(data.key())))
.collect(Collectors.toList());
}
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
......
......@@ -36,7 +36,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.intent.IntentState.*;
import static org.onosproject.net.intent.IntentState.PURGE_REQ;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -76,11 +76,15 @@ public class SimpleIntentStore
}
@Override
public Iterable<IntentData> getIntentData(boolean localOnly) {
if (localOnly) {
return current.values().stream()
.filter(data -> isMaster(data.key()))
public Iterable<IntentData> getIntentData(boolean localOnly, long olderThan) {
if (localOnly || olderThan > 0) {
long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
final SystemClockTimestamp time = new SystemClockTimestamp(older);
return pending.values().stream()
.filter(data -> data.version().isOlderThan(time) &&
(!localOnly || isMaster(data.key())))
.collect(Collectors.toList());
}
return Lists.newArrayList(current.values());
}
......@@ -191,4 +195,19 @@ public class SimpleIntentStore
.map(IntentData::intent)
.collect(Collectors.toList());
}
@Override
public Iterable<IntentData> getPendingData() {
return Lists.newArrayList(pending.values());
}
@Override
public Iterable<IntentData> getPendingData(boolean localOnly, long olderThan) {
long older = System.nanoTime() - olderThan * 1_000_000; //convert ms to ns
final SystemClockTimestamp time = new SystemClockTimestamp(older);
return pending.values().stream()
.filter(data -> data.version().isOlderThan(time) &&
(!localOnly || isMaster(data.key())))
.collect(Collectors.toList());
}
}
......
......@@ -29,10 +29,14 @@ import static com.google.common.base.Preconditions.checkArgument;
*/
public class SystemClockTimestamp implements Timestamp {
private final long unixTimestamp;
private final long nanoTimestamp;
public SystemClockTimestamp() {
unixTimestamp = System.nanoTime();
nanoTimestamp = System.nanoTime();
}
public SystemClockTimestamp(long timestamp) {
nanoTimestamp = timestamp;
}
@Override
......@@ -42,12 +46,12 @@ public class SystemClockTimestamp implements Timestamp {
SystemClockTimestamp that = (SystemClockTimestamp) o;
return ComparisonChain.start()
.compare(this.unixTimestamp, that.unixTimestamp)
.compare(this.nanoTimestamp, that.nanoTimestamp)
.result();
}
@Override
public int hashCode() {
return Objects.hash(unixTimestamp);
return Objects.hash(nanoTimestamp);
}
@Override
......@@ -59,17 +63,21 @@ public class SystemClockTimestamp implements Timestamp {
return false;
}
SystemClockTimestamp that = (SystemClockTimestamp) obj;
return Objects.equals(this.unixTimestamp, that.unixTimestamp);
return Objects.equals(this.nanoTimestamp, that.nanoTimestamp);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("unixTimestamp", unixTimestamp)
.add("nanoTimestamp", nanoTimestamp)
.toString();
}
public long nanoTimestamp() {
return nanoTimestamp;
}
public long systemTimestamp() {
return unixTimestamp;
return nanoTimestamp / 1_000_000; // convert ns to ms
}
}
......