Jonathan Hart
Committed by Gerrit Code Review

Cleaned up IntentStores.

 * Removed HazelcastIntentStore
 * Moved SimpleIntentStore back to trivial bundle (and removed older version
   that was already in the trivial bundle)
 * Removed default methods from IntentStore interface

ONOS-1056

Change-Id: Id5e15f44e287f51cca3e0b12a85d49cb4a07a9d3
......@@ -17,7 +17,6 @@ package org.onosproject.net.intent;
import org.onosproject.store.Store;
import java.util.Collections;
import java.util.List;
/**
......@@ -45,27 +44,23 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @param intentKey intent identification
* @return current intent state
*/
default IntentState getIntentState(Key intentKey) {
return null;
}
IntentState getIntentState(Key intentKey);
/**
* Returns the list of the installable events associated with the specified
* original intent.
*
* @param intentKey original intent identifier
* @return compiled installable intents
* @return compiled installable intents, or null if no installables exist
*/
default List<Intent> getInstallableIntents(Key intentKey) {
throw new UnsupportedOperationException("getInstallableIntents()");
}
List<Intent> getInstallableIntents(Key intentKey);
/**
* Writes an IntentData object to the store.
*
* @param newData new intent data to write
*/
default void write(IntentData newData) {}
void write(IntentData newData);
/**
* Writes a batch of IntentData objects to the store. A batch has no
......@@ -73,7 +68,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
*
* @param updates collection of intent data objects to write
*/
default void batchWrite(Iterable<IntentData> updates) {}
void batchWrite(Iterable<IntentData> updates);
/**
* Returns the intent with the specified identifier.
......@@ -81,10 +76,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @param key key
* @return intent or null if not found
*/
default Intent getIntent(Key key) {
// FIXME remove this default implementation when all stores have implemented it
return null;
}
Intent getIntent(Key key);
/**
* Returns the intent data object associated with the specified key.
......@@ -92,16 +84,14 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @param key key to look up
* @return intent data object
*/
default IntentData getIntentData(Key key) { //FIXME remove when impl.
return null;
}
IntentData getIntentData(Key key);
/**
* Adds a new operation, which should be persisted and delegated.
*
* @param intent operation
*/
default void addPending(IntentData intent) {} //FIXME remove when impl.
void addPending(IntentData intent);
/**
* Checks to see whether the calling instance is the master for processing
......@@ -111,17 +101,12 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @return true if master; false, otherwise
*/
//TODO better name
default boolean isMaster(Key intentKey) { //FIXME remove default when impl.
return true;
}
boolean isMaster(Key intentKey);
/**
* Returns the intent requests pending processing.
*
* @return pending intents
*/
// FIXME remove default
default Iterable<Intent> getPending() {
return Collections.emptyList();
}
Iterable<Intent> getPending();
}
......
......@@ -48,7 +48,7 @@ import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.Key;
import org.onosproject.net.resource.LinkResourceAllocations;
import org.onosproject.store.intent.impl.SimpleIntentStore;
import org.onosproject.store.trivial.impl.SimpleIntentStore;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
......
......@@ -37,7 +37,7 @@ import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.SystemClockTimestamp;
import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
......@@ -80,7 +80,7 @@ public class GossipIntentStore
.register(KryoNamespaces.API)
.register(IntentData.class)
.register(MultiValuedTimestamp.class)
.register(SystemClockTimestamp.class);
.register(WallClockTimestamp.class);
currentMap = new EventuallyConsistentMapImpl<>("intent-current",
clusterService,
......@@ -225,8 +225,6 @@ public class GossipIntentStore
@Override
public void write(IntentData newData) {
//log.debug("writing intent {}", newData);
IntentData currentData = currentMap.get(newData.key());
if (isUpdateAcceptable(currentData, newData)) {
......@@ -239,12 +237,6 @@ public class GossipIntentStore
} else {
log.debug("not writing update: current {}, new {}", currentData, newData);
}
/*try {
notifyDelegate(IntentEvent.getEvent(newData));
} catch (IllegalArgumentException e) {
//no-op
log.trace("ignore this exception: {}", e);
}*/
}
@Override
......@@ -268,9 +260,8 @@ public class GossipIntentStore
@Override
public void addPending(IntentData data) {
log.debug("new pending {} {} {}", data.key(), data.state(), data.version());
if (data.version() == null) {
data.setVersion(new SystemClockTimestamp());
data.setVersion(new WallClockTimestamp());
}
pendingMap.put(data.key(), copyData(data));
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.metrics.MetricsService;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.MetricsHelper;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.BatchWrite.Operation;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.metrics.MetricsUtil.startTimer;
import static org.onlab.metrics.MetricsUtil.stopTimer;
import static org.onosproject.net.intent.IntentState.FAILED;
import static org.onosproject.net.intent.IntentState.INSTALLED;
import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
import static org.onosproject.net.intent.IntentState.WITHDRAWN;
import static org.slf4j.LoggerFactory.getLogger;
//TODO Note: this store will be removed
@Component(immediate = true, enabled = false)
@Service
public class HazelcastIntentStore
extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
implements IntentStore, MetricsHelper {
/** Valid parking state, which can transition to INSTALLED. */
private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
/** Valid parking state, which can transition to WITHDRAWN. */
private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
private final Logger log = getLogger(getClass());
// Assumption: IntentId will not have synonyms
private static final String INTENTS_MAP_NAME = "intents";
private SMap<IntentId, Intent> intents;
private static final String INTENT_STATES_MAP_NAME = "intent-states";
private SMap<IntentId, IntentState> states;
// Map to store instance local intermediate state transition
private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
private SMap<IntentId, List<Intent>> installable;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MetricsService metricsService;
private boolean onlyLogTransitionError = true;
private Timer getInstallableIntentsTimer;
private Timer getIntentCountTimer;
private Timer getIntentsTimer;
private Timer getIntentTimer;
private Timer getIntentStateTimer;
// manual near cache of Intent
// (Note: IntentId -> Intent is expected to be immutable)
// entry will be evicted, when state for that IntentId is removed.
private Map<IntentId, Intent> localIntents;
private String stateListenerId;
private String intentsListenerId;
private Timer createResponseTimer(String methodName) {
return createTimer("IntentStore", methodName, "responseTime");
}
@Override
@Activate
public void activate() {
localIntents = new ConcurrentHashMap<>();
getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
getIntentCountTimer = createResponseTimer("getIntentCount");
getIntentsTimer = createResponseTimer("getIntents");
getIntentTimer = createResponseTimer("getIntent");
getIntentStateTimer = createResponseTimer("getIntentState");
// We need a way to add serializer for intents which has been plugged-in.
// As a short term workaround, relax Kryo config to
// registrationRequired=false
super.activate();
super.serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
final Config config = theInstance.getConfig();
MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
intents = new SMap<>(rawIntents , super.serializer);
intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
states = new SMap<>(rawStates , super.serializer);
EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
stateListenerId = states.addEntryListener(listener, true);
transientStates.clear();
MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
installable = new SMap<>(rawInstallables , super.serializer);
log.info("Started");
}
@Deactivate
public void deactivate() {
intents.removeEntryListener(intentsListenerId);
states.removeEntryListener(stateListenerId);
log.info("Stopped");
}
@Override
public MetricsService metricsService() {
return metricsService;
}
@Override
public long getIntentCount() {
Context timer = startTimer(getIntentCountTimer);
try {
return intents.size();
} finally {
stopTimer(timer);
}
}
@Override
public Iterable<Intent> getIntents() {
Context timer = startTimer(getIntentsTimer);
try {
return ImmutableSet.copyOf(intents.values());
} finally {
stopTimer(timer);
}
}
@Override
public Intent getIntent(Key intentKey) {
return null;
}
public Intent getIntent(IntentId intentId) {
Context timer = startTimer(getIntentTimer);
try {
Intent intent = localIntents.get(intentId);
if (intent != null) {
return intent;
}
intent = intents.get(intentId);
if (intent != null) {
localIntents.put(intentId, intent);
}
return intent;
} finally {
stopTimer(timer);
}
}
@Override
public IntentState getIntentState(Key key) {
// TODO: either implement this or remove this class
return IntentState.FAILED;
/*
Context timer = startTimer(getIntentStateTimer);
try {
final IntentState localState = transientStates.get(id);
if (localState != null) {
return localState;
}
return states.get(id);
} finally {
stopTimer(timer);
}
*/
}
private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
if (onlyLogTransitionError) {
if (!expression) {
log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
}
} else {
Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
}
}
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
// TODO: implement this or delete class
return null;
/*
Context timer = startTimer(getInstallableIntentsTimer);
try {
return installable.get(intentId);
} finally {
stopTimer(timer);
}
*/
}
/*@Override
public List<Operation> batchWrite(BatchWrite batch) {
if (batch.isEmpty()) {
return Collections.emptyList();
}
// Hazelcast version will never fail for conditional failure now.
List<Operation> failed = new ArrayList<>();
List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
List<IntentEvent> events = Lists.newArrayList();
batchWriteAsync(batch, failed, futures);
// verify result
verifyAsyncWrites(futures, failed, events);
notifyDelegate(events);
return failed;
}*/
private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
List<Pair<Operation, List<Future<?>>>> futures) {
for (Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
checkArgument(op.args().size() == 1,
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = op.arg(0);
futures.add(Pair.of(op,
ImmutableList.of(intents.putAsync(intent.id(), intent),
states.putAsync(intent.id(), INSTALL_REQ))));
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.arg(0);
futures.add(Pair.of(op,
ImmutableList.of(intents.removeAsync(intentId),
states.removeAsync(intentId),
installable.removeAsync(intentId))));
break;
case SET_STATE:
checkArgument(op.args().size() == 2,
"SET_STATE takes 2 arguments. %s", op);
intent = op.arg(0);
IntentState newState = op.arg(1);
futures.add(Pair.of(op,
ImmutableList.of(states.putAsync(intent.id(), newState))));
break;
case SET_INSTALLABLE:
checkArgument(op.args().size() == 2,
"SET_INSTALLABLE takes 2 arguments. %s", op);
intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
futures.add(Pair.of(op,
ImmutableList.of(installable.putAsync(intentId, installableIntents))));
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = op.arg(0);
futures.add(Pair.of(op,
ImmutableList.of(installable.removeAsync(intentId))));
break;
default:
log.warn("Unknown Operation encountered: {}", op);
failed.add(op);
break;
}
}
}
/**
* Checks the async write result Futures and prepare Events to post.
*
* @param futures async write Futures
* @param failed list to output failed batch write operations
* @param events list to output events to post as result of writes
*/
private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
List<Operation> failed,
List<IntentEvent> events) {
for (Pair<Operation, List<Future<?>>> future : futures) {
final Operation op = future.getLeft();
final List<Future<?>> subops = future.getRight();
switch (op.type()) {
case CREATE_INTENT:
{
Intent intent = op.arg(0);
IntentState newIntentState = INSTALL_REQ;
try {
Intent prevIntent = (Intent) subops.get(0).get();
IntentState prevIntentState = (IntentState) subops.get(1).get();
if (prevIntent != null || prevIntentState != null) {
log.warn("Overwriting existing Intent: {}@{} with {}@{}",
prevIntent, prevIntentState,
intent, newIntentState);
}
events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case REMOVE_INTENT:
{
IntentId intentId = op.arg(0);
try {
Intent prevIntent = (Intent) subops.get(0).get();
IntentState prevIntentState = (IntentState) subops.get(1).get();
@SuppressWarnings("unchecked")
List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
if (prevIntent == null) {
log.warn("Intent {} was already removed.", intentId);
}
if (prevIntentState == null) {
log.warn("Intent {} state was already removed", intentId);
}
if (prevInstallable != null) {
log.warn("Intent {} removed installable still found", intentId);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case SET_STATE:
{
Intent intent = op.arg(0);
IntentId intentId = intent.id();
IntentState newState = op.arg(1);
try {
IntentState prevIntentState = (IntentState) subops.get(0).get();
if (PARKING.contains(newState)) {
transientStates.remove(intentId);
events.add(IntentEvent.getEvent(newState, intent));
}
log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case SET_INSTALLABLE:
{
IntentId intentId = op.arg(0);
List<Intent> installableIntents = op.arg(1);
try {
@SuppressWarnings("unchecked")
List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
if (prevInstallable != null) {
log.warn("Overwriting Intent {} installable {} -> {}",
intentId, prevInstallable, installableIntents);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
case REMOVE_INSTALLED:
{
IntentId intentId = op.arg(0);
try {
@SuppressWarnings("unchecked")
List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
if (prevInstallable == null) {
log.warn("Intent {} installable was already removed", intentId);
}
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("Batch write failed processing {}", op, e);
failed.add(op);
}
break;
}
default:
log.warn("Unknown Operation encountered: {}", op);
if (!failed.contains(op)) {
failed.add(op);
}
break;
}
}
}
public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
@Override
public void entryAdded(EntryEvent<IntentId, Intent> event) {
localIntents.put(event.getKey(), event.getValue());
}
@Override
public void entryUpdated(EntryEvent<IntentId, Intent> event) {
entryAdded(event);
}
}
public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
@Override
public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
final IntentId intentId = event.getKey();
final Member myself = theInstance.getCluster().getLocalMember();
if (!myself.equals(event.getMember())) {
// When Intent state was modified by remote node,
// clear local transient state.
IntentState oldState = transientStates.remove(intentId);
if (oldState != null) {
log.debug("{} state updated remotely, removing transient state {}",
intentId, oldState);
}
if (event.getValue() != null) {
// notify if this is not entry removed event
final Intent intent = getIntent(intentId);
if (intent == null) {
log.warn("no Intent found for {} on Event {}", intentId, event);
return;
}
notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
// remove IntentCache
localIntents.remove(intentId, intent);
}
}
// populate manual near cache, to prepare for
// transition event to WITHDRAWN
getIntent(intentId);
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.impl.SystemClockTimestamp;
import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
//TODO Note: this store will be removed once the GossipIntentStore is stable
@Component(immediate = true, enabled = false)
@Service
//FIXME remove this
public class SimpleIntentStore
extends AbstractStore<IntentEvent, IntentStoreDelegate>
implements IntentStore {
private final Logger log = getLogger(getClass());
private final Map<Key, IntentData> current = Maps.newConcurrentMap();
private final Map<Key, IntentData> pending = Maps.newConcurrentMap();
private IntentData copyData(IntentData original) {
if (original == null) {
return null;
}
IntentData result =
new IntentData(original.intent(), original.state(), original.version());
if (original.installables() != null) {
result.setInstallables(original.installables());
}
return result;
}
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public long getIntentCount() {
return current.size();
}
@Override
public Iterable<Intent> getIntents() {
return current.values().stream()
.map(IntentData::intent)
.collect(Collectors.toList());
}
@Override
public IntentState getIntentState(Key intentKey) {
IntentData data = current.get(intentKey);
return (data != null) ? data.state() : null;
}
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
// TODO: implement this or delete class
return null;
/*
for (IntentData data : current.values()) {
if (Objects.equals(data.intent().id(), intentId)) {
return data.installables();
}
}
return null;
*/
}
/**
* Determines whether an intent data update is allowed. The update must
* either have a higher version than the current data, or the state
* transition between two updates of the same version must be sane.
*
* @param currentData existing intent data in the store
* @param newData new intent data update proposal
* @return true if we can apply the update, otherwise false
*/
private boolean isUpdateAcceptable(IntentData currentData, IntentData newData) {
if (currentData == null) {
return true;
} else if (currentData.version().compareTo(newData.version()) < 0) {
return true;
} else if (currentData.version().compareTo(newData.version()) > 0) {
return false;
}
// current and new data versions are the same
IntentState currentState = currentData.state();
IntentState newState = newData.state();
switch (newState) {
case INSTALLING:
if (currentState == INSTALLING) {
return false;
}
// FALLTHROUGH
case INSTALLED:
if (currentState == INSTALLED) {
return false;
} else if (currentState == WITHDRAWING || currentState == WITHDRAWN) {
log.warn("Invalid state transition from {} to {} for intent {}",
currentState, newState, newData.key());
return false;
}
return true;
case WITHDRAWING:
if (currentState == WITHDRAWING) {
return false;
}
// FALLTHOUGH
case WITHDRAWN:
if (currentState == WITHDRAWN) {
return false;
} else if (currentState == INSTALLING || currentState == INSTALLED) {
log.warn("Invalid state transition from {} to {} for intent {}",
currentState, newState, newData.key());
return false;
}
return true;
case FAILED:
if (currentState == FAILED) {
return false;
}
return true;
case COMPILING:
case RECOMPILING:
case INSTALL_REQ:
case WITHDRAW_REQ:
default:
log.warn("Invalid state {} for intent {}", newState, newData.key());
return false;
}
}
@Override
public void write(IntentData newData) {
synchronized (this) {
// TODO this could be refactored/cleaned up
IntentData currentData = current.get(newData.key());
IntentData pendingData = pending.get(newData.key());
if (isUpdateAcceptable(currentData, newData)) {
current.put(newData.key(), copyData(newData));
if (pendingData != null
// pendingData version is less than or equal to newData's
// Note: a new update for this key could be pending (it's version will be greater)
&& pendingData.version().compareTo(newData.version()) <= 0) {
pending.remove(newData.key());
}
notifyDelegateIfNotNull(IntentEvent.getEvent(newData));
}
}
}
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
@Override
public void batchWrite(Iterable<IntentData> updates) {
for (IntentData data : updates) {
write(data);
}
}
@Override
public Intent getIntent(Key key) {
IntentData data = current.get(key);
return (data != null) ? data.intent() : null;
}
@Override
public IntentData getIntentData(Key key) {
return copyData(current.get(key));
}
@Override
public void addPending(IntentData data) {
if (data.version() == null) { // recompiled intents will already have a version
data.setVersion(new SystemClockTimestamp());
}
synchronized (this) {
IntentData existingData = pending.get(data.key());
if (existingData == null ||
// existing version is strictly less than data's version
// Note: if they are equal, we already have the update
// TODO maybe we should still make this <= to be safe?
existingData.version().compareTo(data.version()) < 0) {
pending.put(data.key(), data);
checkNotNull(delegate, "Store delegate is not set")
.process(data);
notifyDelegateIfNotNull(IntentEvent.getEvent(data));
} else {
log.debug("IntentData {} is older than existing: {}",
data, existingData);
}
//TODO consider also checking the current map at this point
}
}
@Override
public boolean isMaster(Key intentKey) {
return true;
}
}
/*
* Copyright 2014 Open Networking Laboratory
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -35,10 +35,9 @@ import java.util.Map;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
//TODO Note: this store will be removed
@Component(immediate = true)
@Service
public class SimpleIntentStore
......@@ -47,9 +46,21 @@ public class SimpleIntentStore
private final Logger log = getLogger(getClass());
// current state maps FIXME.. make this a IntentData map
private final Map<Key, IntentData> current = Maps.newConcurrentMap();
private final Map<Key, IntentData> pending = Maps.newConcurrentMap(); //String is "key"
private final Map<Key, IntentData> pending = Maps.newConcurrentMap();
private IntentData copyData(IntentData original) {
if (original == null) {
return null;
}
IntentData result =
new IntentData(original.intent(), original.state(), original.version());
if (original.installables() != null) {
result.setInstallables(original.installables());
}
return result;
}
@Activate
public void activate() {
......@@ -82,23 +93,111 @@ public class SimpleIntentStore
@Override
public List<Intent> getInstallableIntents(Key intentKey) {
IntentData data = current.get(intentKey);
return (data != null) ? data.installables() : null;
if (data != null) {
return data.installables();
}
return null;
}
/**
* Determines whether an intent data update is allowed. The update must
* either have a higher version than the current data, or the state
* transition between two updates of the same version must be sane.
*
* @param currentData existing intent data in the store
* @param newData new intent data update proposal
* @return true if we can apply the update, otherwise false
*/
private boolean isUpdateAcceptable(IntentData currentData, IntentData newData) {
if (currentData == null) {
return true;
} else if (currentData.version().compareTo(newData.version()) < 0) {
return true;
} else if (currentData.version().compareTo(newData.version()) > 0) {
return false;
}
// current and new data versions are the same
IntentState currentState = currentData.state();
IntentState newState = newData.state();
switch (newState) {
case INSTALLING:
if (currentState == INSTALLING) {
return false;
}
// FALLTHROUGH
case INSTALLED:
if (currentState == INSTALLED) {
return false;
} else if (currentState == WITHDRAWING || currentState == WITHDRAWN) {
log.warn("Invalid state transition from {} to {} for intent {}",
currentState, newState, newData.key());
return false;
}
return true;
case WITHDRAWING:
if (currentState == WITHDRAWING) {
return false;
}
// FALLTHOUGH
case WITHDRAWN:
if (currentState == WITHDRAWN) {
return false;
} else if (currentState == INSTALLING || currentState == INSTALLED) {
log.warn("Invalid state transition from {} to {} for intent {}",
currentState, newState, newData.key());
return false;
}
return true;
case FAILED:
if (currentState == FAILED) {
return false;
}
return true;
case COMPILING:
case RECOMPILING:
case INSTALL_REQ:
case WITHDRAW_REQ:
default:
log.warn("Invalid state {} for intent {}", newState, newData.key());
return false;
}
}
@Override
public void write(IntentData newData) {
//FIXME need to compare the versions
current.put(newData.key(), newData);
try {
notifyDelegate(IntentEvent.getEvent(newData));
} catch (IllegalArgumentException e) {
//no-op
log.trace("ignore this exception: {}", e);
}
IntentData old = pending.get(newData.key());
if (old != null /* && FIXME version check */) {
synchronized (this) {
// TODO this could be refactored/cleaned up
IntentData currentData = current.get(newData.key());
IntentData pendingData = pending.get(newData.key());
if (isUpdateAcceptable(currentData, newData)) {
current.put(newData.key(), copyData(newData));
if (pendingData != null
// pendingData version is less than or equal to newData's
// Note: a new update for this key could be pending (it's version will be greater)
&& pendingData.version().compareTo(newData.version()) <= 0) {
pending.remove(newData.key());
}
notifyDelegateIfNotNull(IntentEvent.getEvent(newData));
}
}
}
private void notifyDelegateIfNotNull(IntentEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
@Override
......@@ -114,14 +213,44 @@ public class SimpleIntentStore
return (data != null) ? data.intent() : null;
}
@Override
public IntentData getIntentData(Key key) {
return copyData(current.get(key));
}
@Override
public void addPending(IntentData data) {
//FIXME need to compare versions
if (data.version() == null) { // recompiled intents will already have a version
data.setVersion(new SystemClockTimestamp());
}
synchronized (this) {
IntentData existingData = pending.get(data.key());
if (existingData == null ||
// existing version is strictly less than data's version
// Note: if they are equal, we already have the update
// TODO maybe we should still make this <= to be safe?
existingData.version().compareTo(data.version()) < 0) {
pending.put(data.key(), data);
checkNotNull(delegate, "Store delegate is not set")
.process(data);
notifyDelegate(IntentEvent.getEvent(data));
notifyDelegateIfNotNull(IntentEvent.getEvent(data));
} else {
log.debug("IntentData {} is older than existing: {}",
data, existingData);
}
//TODO consider also checking the current map at this point
}
}
@Override
public boolean isMaster(Key intentKey) {
return true;
}
@Override
public Iterable<Intent> getPending() {
return pending.values().stream()
.map(IntentData::intent)
.collect(Collectors.toList());
}
}
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
package org.onosproject.store.trivial.impl;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
......