Sho SHIMIZU
Committed by Gerrit Code Review

ONOS-3296: Support continuous type resources

Change-Id: I155e41e7a7c1750ff45986a55bedab353485d3fa
......@@ -126,13 +126,13 @@ public interface ResourceService extends ListenerService<ResourceEvent, Resource
boolean release(ResourceConsumer consumer);
/**
* Returns resource allocation of the specified resource.
* Returns resource allocations of the specified resource.
*
* @param resource resource to check the allocation
* @return allocation information enclosed by Optional.
* If the resource is not allocated, the return value is empty.
* @return list of allocation information.
* If the resource is not allocated, the return value is an empty list.
*/
Optional<ResourceAllocation> getResourceAllocation(ResourcePath resource);
List<ResourceAllocation> getResourceAllocation(ResourcePath resource);
/**
* Returns allocated resources being as children of the specified parent and being the specified resource type.
......
/*
* Copyright 2015 Open Networking Laboratory
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,7 +20,6 @@ import org.onosproject.store.Store;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
/**
* Service for storing resource and consumer information.
......@@ -77,12 +76,23 @@ public interface ResourceStore extends Store<ResourceEvent, ResourceStoreDelegat
boolean release(List<ResourcePath> resources, List<ResourceConsumer> consumers);
/**
* Returns the resource consumer to whom the specified resource is allocated.
* Returns the resource consumers to whom the specified resource is allocated.
* The return value is a list having only one element when the given resource is discrete type.
* The return value may have multiple elements when the given resource is continuous type.
*
* @param resource resource whose allocated consumer to be returned
* @return resource consumer who are allocated the resource
* @return resource consumers who are allocated the resource.
* Returns empty list if there is no such consumer.
*/
Optional<ResourceConsumer> getConsumer(ResourcePath resource);
List<ResourceConsumer> getConsumers(ResourcePath resource);
/**
* Returns the availability of the specified resource.
*
* @param resource resource to check the availability
* @return true if available, otherwise false
*/
boolean isAvailable(ResourcePath resource);
/**
* Returns a collection of the resources allocated to the specified consumer.
......
......@@ -313,9 +313,11 @@ public class OpticalCircuitIntentCompiler implements IntentCompiler<OpticalCircu
OchPort ochPort = (OchPort) deviceService.getPort(ochCP.deviceId(), ochCP.port());
Optional<IntentId> intentId =
resourceService.getResourceAllocation(ResourcePath.discrete(ochCP.deviceId(), ochCP.port()))
.stream()
.map(ResourceAllocation::consumer)
.filter(x -> x instanceof IntentId)
.map(x -> (IntentId) x);
.map(x -> (IntentId) x)
.findAny();
if (isAvailable(intentId.orElse(null))) {
return ochPort;
......@@ -332,9 +334,12 @@ public class OpticalCircuitIntentCompiler implements IntentCompiler<OpticalCircu
Optional<IntentId> intentId =
resourceService.getResourceAllocation(ResourcePath.discrete(oduPort.deviceId(), port.number()))
.stream()
.map(ResourceAllocation::consumer)
.filter(x -> x instanceof IntentId)
.map(x -> (IntentId) x);
.map(x -> (IntentId) x)
.findAny();
if (isAvailable(intentId.orElse(null))) {
return (OchPort) port;
}
......
/*
* Copyright 2015 Open Networking Laboratory
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -23,6 +23,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.GuavaCollectors;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.net.newresource.ResourceAdminService;
import org.onosproject.net.newresource.ResourceAllocation;
......@@ -34,10 +35,8 @@ import org.onosproject.net.newresource.ResourcePath;
import org.onosproject.net.newresource.ResourceStore;
import org.onosproject.net.newresource.ResourceStoreDelegate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -107,11 +106,13 @@ public final class ResourceManager extends AbstractListenerManager<ResourceEvent
}
@Override
public Optional<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
public List<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
checkNotNull(resource);
Optional<ResourceConsumer> consumer = store.getConsumer(resource);
return consumer.map(x -> new ResourceAllocation(resource, x));
List<ResourceConsumer> consumers = store.getConsumers(resource);
return consumers.stream()
.map(x -> new ResourceAllocation(resource, x))
.collect(GuavaCollectors.toImmutableList());
}
@Override
......@@ -119,17 +120,12 @@ public final class ResourceManager extends AbstractListenerManager<ResourceEvent
checkNotNull(parent);
checkNotNull(cls);
Collection<ResourcePath> resources = store.getAllocatedResources(parent, cls);
List<ResourceAllocation> allocations = new ArrayList<>(resources.size());
for (ResourcePath resource: resources) {
// We access store twice in this method, then the store may be updated by others
Optional<ResourceConsumer> consumer = store.getConsumer(resource);
if (consumer.isPresent()) {
allocations.add(new ResourceAllocation(resource, consumer.get()));
}
}
return allocations;
Collection<ResourcePath> resources = store.getAllocatedResources(parent, cls);
return resources.stream()
.flatMap(resource -> store.getConsumers(resource).stream()
.map(consumer -> new ResourceAllocation(resource, consumer)))
.collect(GuavaCollectors.toImmutableList());
}
@Override
......@@ -149,7 +145,7 @@ public final class ResourceManager extends AbstractListenerManager<ResourceEvent
Collection<ResourcePath> children = store.getChildResources(parent);
return children.stream()
// We access store twice in this method, then the store may be updated by others
.filter(x -> !store.getConsumer(x).isPresent())
.filter(store::isAvailable)
.collect(Collectors.toList());
}
......@@ -157,8 +153,7 @@ public final class ResourceManager extends AbstractListenerManager<ResourceEvent
public boolean isAvailable(ResourcePath resource) {
checkNotNull(resource);
Optional<ResourceConsumer> consumer = store.getConsumer(resource);
return !consumer.isPresent();
return store.isAvailable(resource);
}
@Override
......
......@@ -66,9 +66,10 @@ class MockResourceService implements ResourceService {
}
@Override
public Optional<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
public List<ResourceAllocation> getResourceAllocation(ResourcePath resource) {
return Optional.ofNullable(assignment.get(resource))
.map(x -> new ResourceAllocation(resource, x));
.map(x -> ImmutableList.of(new ResourceAllocation(resource, x)))
.orElse(ImmutableList.of());
}
@Override
......
/*
* Copyright 2015 Open Networking Laboratory
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -17,14 +17,18 @@ package org.onosproject.store.newresource.impl;
import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.GuavaCollectors;
import org.onlab.util.Tools;
import org.onosproject.net.newresource.ResourceAllocation;
import org.onosproject.net.newresource.ResourceConsumer;
import org.onosproject.net.newresource.ResourceEvent;
import org.onosproject.net.newresource.ResourceId;
import org.onosproject.net.newresource.ResourcePath;
import org.onosproject.net.newresource.ResourceStore;
import org.onosproject.net.newresource.ResourceStoreDelegate;
......@@ -40,7 +44,6 @@ import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
......@@ -49,7 +52,9 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -65,10 +70,12 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
implements ResourceStore {
private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
private static final String CONSUMER_MAP = "onos-resource-consumers";
private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
private static final String CHILD_MAP = "onos-resource-children";
private static final Serializer SERIALIZER = Serializer.using(
Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API));
Arrays.asList(KryoNamespaces.BASIC, KryoNamespaces.API),
ContinuousResourceAllocation.class);
// TODO: We should provide centralized values for this
private static final int MAX_RETRIES = 5;
......@@ -77,35 +84,61 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService service;
private ConsistentMap<ResourcePath, ResourceConsumer> consumerMap;
private ConsistentMap<ResourcePath, List<ResourcePath>> childMap;
private ConsistentMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumers;
private ConsistentMap<ResourceId, ContinuousResourceAllocation> continuousConsumers;
private ConsistentMap<ResourcePath.Discrete, Set<ResourcePath>> childMap;
@Activate
public void activate() {
consumerMap = service.<ResourcePath, ResourceConsumer>consistentMapBuilder()
.withName(CONSUMER_MAP)
discreteConsumers = service.<ResourcePath.Discrete, ResourceConsumer>consistentMapBuilder()
.withName(DISCRETE_CONSUMER_MAP)
.withSerializer(SERIALIZER)
.build();
childMap = service.<ResourcePath, List<ResourcePath>>consistentMapBuilder()
continuousConsumers = service.<ResourceId, ContinuousResourceAllocation>consistentMapBuilder()
.withName(CONTINUOUS_CONSUMER_MAP)
.withSerializer(SERIALIZER)
.build();
childMap = service.<ResourcePath.Discrete, Set<ResourcePath>>consistentMapBuilder()
.withName(CHILD_MAP)
.withSerializer(SERIALIZER)
.build();
Tools.retryable(() -> childMap.put(ResourcePath.ROOT, ImmutableList.of()),
Tools.retryable(() -> childMap.put(ResourcePath.ROOT, new LinkedHashSet<>()),
ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
log.info("Started");
}
@Override
public Optional<ResourceConsumer> getConsumer(ResourcePath resource) {
public List<ResourceConsumer> getConsumers(ResourcePath resource) {
checkNotNull(resource);
checkArgument(resource instanceof ResourcePath.Discrete || resource instanceof ResourcePath.Continuous);
if (resource instanceof ResourcePath.Discrete) {
return getConsumer((ResourcePath.Discrete) resource);
} else {
return getConsumer((ResourcePath.Continuous) resource);
}
}
Versioned<ResourceConsumer> consumer = consumerMap.get(resource);
private List<ResourceConsumer> getConsumer(ResourcePath.Discrete resource) {
Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
if (consumer == null) {
return Optional.empty();
return ImmutableList.of();
}
return ImmutableList.of(consumer.value());
}
private List<ResourceConsumer> getConsumer(ResourcePath.Continuous resource) {
Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource.id());
if (allocations == null) {
return ImmutableList.of();
}
return Optional.of(consumer.value());
return allocations.value().allocations().stream()
.filter(x -> x.resource().equals(resource))
.map(ResourceAllocation::consumer)
.collect(GuavaCollectors.toImmutableList());
}
@Override
......@@ -115,15 +148,16 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream()
Map<ResourcePath.Discrete, List<ResourcePath>> resourceMap = resources.stream()
.filter(x -> x.parent().isPresent())
.collect(Collectors.groupingBy(x -> x.parent().get()));
for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) {
if (!isRegistered(childTxMap, entry.getKey())) {
for (Map.Entry<ResourcePath.Discrete, List<ResourcePath>> entry: resourceMap.entrySet()) {
Optional<ResourcePath.Discrete> child = lookup(childTxMap, entry.getKey());
if (!child.isPresent()) {
return abortTransaction(tx);
}
......@@ -150,19 +184,32 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Map<ResourcePath, List<ResourcePath>> resourceMap = resources.stream()
// Extract Discrete instances from resources
Map<ResourcePath.Discrete, List<ResourcePath>> resourceMap = resources.stream()
.filter(x -> x.parent().isPresent())
.collect(Collectors.groupingBy(x -> x.parent().get()));
// even if one of the resources is allocated to a consumer,
// all unregistrations are regarded as failure
for (Map.Entry<ResourcePath, List<ResourcePath>> entry: resourceMap.entrySet()) {
if (entry.getValue().stream().anyMatch(x -> consumerTxMap.get(x) != null)) {
for (Map.Entry<ResourcePath.Discrete, List<ResourcePath>> entry: resourceMap.entrySet()) {
boolean allocated = entry.getValue().stream().anyMatch(x -> {
if (x instanceof ResourcePath.Discrete) {
return discreteConsumerTxMap.get((ResourcePath.Discrete) x) != null;
} else if (x instanceof ResourcePath.Continuous) {
ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(x.id());
return allocations != null && !allocations.allocations().isEmpty();
} else {
return false;
}
});
if (allocated) {
return abortTransaction(tx);
}
......@@ -190,20 +237,40 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
TransactionalMap<ResourcePath, List<ResourcePath>> childTxMap =
TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> childTxMap =
tx.getTransactionalMap(CHILD_MAP, SERIALIZER);
TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
for (ResourcePath resource: resources) {
if (!isRegistered(childTxMap, resource)) {
if (resource instanceof ResourcePath.Discrete) {
if (!lookup(childTxMap, resource).isPresent()) {
return abortTransaction(tx);
}
ResourceConsumer oldValue = consumerTxMap.put(resource, consumer);
ResourceConsumer oldValue = discreteConsumerTxMap.put((ResourcePath.Discrete) resource, consumer);
if (oldValue != null) {
return abortTransaction(tx);
}
} else if (resource instanceof ResourcePath.Continuous) {
Optional<ResourcePath.Continuous> continuous = lookup(childTxMap, (ResourcePath.Continuous) resource);
if (!continuous.isPresent()) {
return abortTransaction(tx);
}
ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.get().id());
if (!hasEnoughResource(continuous.get(), (ResourcePath.Continuous) resource, allocations)) {
return abortTransaction(tx);
}
boolean success = appendValue(continuousConsumerTxMap,
continuous.get(), new ResourceAllocation(continuous.get(), consumer));
if (!success) {
return abortTransaction(tx);
}
}
}
return tx.commit();
......@@ -218,8 +285,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
TransactionContext tx = service.transactionContextBuilder().build();
tx.begin();
TransactionalMap<ResourcePath, ResourceConsumer> consumerTxMap =
tx.getTransactionalMap(CONSUMER_MAP, SERIALIZER);
TransactionalMap<ResourcePath.Discrete, ResourceConsumer> discreteConsumerTxMap =
tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER);
TransactionalMap<ResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
Iterator<ResourcePath> resourceIte = resources.iterator();
Iterator<ResourceConsumer> consumerIte = consumers.iterator();
......@@ -227,33 +296,76 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
ResourcePath resource = resourceIte.next();
ResourceConsumer consumer = consumerIte.next();
if (resource instanceof ResourcePath.Discrete) {
// if this single release fails (because the resource is allocated to another consumer,
// the whole release fails
if (!consumerTxMap.remove(resource, consumer)) {
if (!discreteConsumerTxMap.remove((ResourcePath.Discrete) resource, consumer)) {
return abortTransaction(tx);
}
} else if (resource instanceof ResourcePath.Continuous) {
ResourcePath.Continuous continuous = (ResourcePath.Continuous) resource;
ContinuousResourceAllocation allocation = continuousConsumerTxMap.get(continuous.id());
ImmutableList<ResourceAllocation> newAllocations = allocation.allocations().stream()
.filter(x -> !(x.consumer().equals(consumer) &&
((ResourcePath.Continuous) x.resource()).value() == continuous.value()))
.collect(GuavaCollectors.toImmutableList());
if (!continuousConsumerTxMap.replace(continuous.id(), allocation,
new ContinuousResourceAllocation(allocation.original(), newAllocations))) {
return abortTransaction(tx);
}
}
}
return tx.commit();
}
@Override
public boolean isAvailable(ResourcePath resource) {
checkNotNull(resource);
checkArgument(resource instanceof ResourcePath.Discrete || resource instanceof ResourcePath.Continuous);
if (resource instanceof ResourcePath.Discrete) {
return getConsumer((ResourcePath.Discrete) resource).isEmpty();
} else {
return isAvailable((ResourcePath.Continuous) resource);
}
}
private boolean isAvailable(ResourcePath.Continuous resource) {
Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
if (allocation == null) {
return false;
}
return hasEnoughResource(allocation.value().original(), resource, allocation.value());
}
@Override
public Collection<ResourcePath> getResources(ResourceConsumer consumer) {
checkNotNull(consumer);
// NOTE: getting all entries may become performance bottleneck
// TODO: revisit for better backend data structure
return consumerMap.entrySet().stream()
Stream<ResourcePath.Discrete> discreteStream = discreteConsumers.entrySet().stream()
.filter(x -> x.getValue().value().equals(consumer))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
.map(Map.Entry::getKey);
Stream<ResourcePath.Continuous> continuousStream = continuousConsumers.values().stream()
.flatMap(x -> x.value().allocations().stream()
.map(y -> Maps.immutableEntry(x.value().original(), y)))
.filter(x -> x.getValue().consumer().equals(consumer))
.map(x -> x.getKey());
return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
}
@Override
public Collection<ResourcePath> getChildResources(ResourcePath parent) {
checkNotNull(parent);
checkArgument(parent instanceof ResourcePath.Discrete);
Versioned<List<ResourcePath>> children = childMap.get(parent);
Versioned<Set<ResourcePath>> children = childMap.get((ResourcePath.Discrete) parent);
if (children == null) {
return Collections.emptyList();
}
......@@ -265,16 +377,28 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
public <T> Collection<ResourcePath> getAllocatedResources(ResourcePath parent, Class<T> cls) {
checkNotNull(parent);
checkNotNull(cls);
checkArgument(parent instanceof ResourcePath.Discrete);
Versioned<List<ResourcePath>> children = childMap.get(parent);
Versioned<Set<ResourcePath>> children = childMap.get((ResourcePath.Discrete) parent);
if (children == null) {
return Collections.emptyList();
}
return children.value().stream()
Stream<ResourcePath.Discrete> discrete = children.value().stream()
.filter(x -> x.last().getClass().equals(cls))
.filter(consumerMap::containsKey)
.collect(Collectors.toList());
.filter(x -> x instanceof ResourcePath.Discrete)
.map(x -> (ResourcePath.Discrete) x)
.filter(discreteConsumers::containsKey);
Stream<ResourcePath.Continuous> continuous = children.value().stream()
.filter(x -> x.last().getClass().equals(cls))
.filter(x -> x instanceof ResourcePath.Continuous)
.map(x -> (ResourcePath.Continuous) x)
.filter(x -> continuousConsumers.containsKey(x.id()))
.filter(x -> continuousConsumers.get(x.id()) != null)
.filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
return Stream.concat(discrete, continuous).collect(Collectors.toList());
}
/**
......@@ -288,6 +412,27 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
return false;
}
// Appends the specified ResourceAllocation to the existing values stored in the map
private boolean appendValue(TransactionalMap<ResourceId, ContinuousResourceAllocation> map,
ResourcePath.Continuous original, ResourceAllocation value) {
ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
new ContinuousResourceAllocation(original, ImmutableList.of(value)));
if (oldValue == null) {
return true;
}
if (oldValue.allocations().contains(value)) {
// don't write to map because all values are already stored
return true;
}
ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
ImmutableList.<ResourceAllocation>builder()
.addAll(oldValue.allocations())
.add(value)
.build());
return map.replace(original.id(), oldValue, newValue);
}
/**
* Appends the values to the existing values associated with the specified key.
* If the map already has all the given values, appending will not happen.
......@@ -299,20 +444,20 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
* @param <V> type of the element of the list
* @return true if the operation succeeds, false otherwise.
*/
private <K, V> boolean appendValues(TransactionalMap<K, List<V>> map, K key, List<V> values) {
List<V> oldValues = map.putIfAbsent(key, new ArrayList<>(values));
private <K, V> boolean appendValues(TransactionalMap<K, Set<V>> map, K key, List<V> values) {
Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>(values));
if (oldValues == null) {
return true;
}
LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues);
if (oldSet.containsAll(values)) {
if (oldValues.containsAll(values)) {
// don't write to map because all values are already stored
return true;
}
oldSet.addAll(values);
return map.replace(key, oldValues, new ArrayList<>(oldSet));
LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
newValues.addAll(values);
return map.replace(key, oldValues, newValues);
}
/**
......@@ -326,37 +471,93 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
* @param <V> type of the element of the list
* @return true if the operation succeeds, false otherwise
*/
private <K, V> boolean removeValues(TransactionalMap<K, List<V>> map, K key, List<V> values) {
List<V> oldValues = map.get(key);
private <K, V> boolean removeValues(TransactionalMap<K, Set<V>> map, K key, List<? extends V> values) {
Set<V> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
if (oldValues == null) {
map.put(key, new ArrayList<>());
return true;
}
LinkedHashSet<V> oldSet = new LinkedHashSet<>(oldValues);
if (values.stream().allMatch(x -> !oldSet.contains(x))) {
if (values.stream().allMatch(x -> !oldValues.contains(x))) {
// don't write map because none of the values are stored
return true;
}
oldSet.removeAll(values);
return map.replace(key, oldValues, new ArrayList<>(oldSet));
LinkedHashSet<V> newValues = new LinkedHashSet<>(oldValues);
newValues.removeAll(values);
return map.replace(key, oldValues, newValues);
}
/**
* Checks if the specified resource is registered as a child of a resource in the map.
* Returns the resource which has the same key as the key of the specified resource
* in the list as a value of the map.
*
* @param map map storing parent - child relationship of resources
* @param resource resource to be checked
* @return true if the resource is registered, false otherwise.
* @param resource resource to be checked for its key
* @return the resource which is regarded as the same as the specified resource
*/
private boolean isRegistered(TransactionalMap<ResourcePath, List<ResourcePath>> map, ResourcePath resource) {
// root is always regarded to be registered
// Naive implementation, which traverses all elements in the list
private <T extends ResourcePath> Optional<T> lookup(
TransactionalMap<ResourcePath.Discrete, Set<ResourcePath>> map, T resource) {
// if it is root, always returns itself
if (!resource.parent().isPresent()) {
return true;
return Optional.of(resource);
}
List<ResourcePath> value = map.get(resource.parent().get());
return value != null && value.contains(resource);
Set<ResourcePath> values = map.get(resource.parent().get());
if (values == null) {
return Optional.empty();
}
@SuppressWarnings("unchecked")
Optional<T> result = values.stream()
.filter(x -> x.id().equals(resource.id()))
.map(x -> (T) x)
.findFirst();
return result;
}
/**
* Checks if there is enough resource volume to allocated the requested resource
* against the specified resource.
*
* @param original original resource
* @param request requested resource
* @param allocation current allocation of the resource
* @return true if there is enough resource volume. Otherwise, false.
*/
private boolean hasEnoughResource(ResourcePath.Continuous original,
ResourcePath.Continuous request,
ContinuousResourceAllocation allocation) {
if (allocation == null) {
return request.value() <= original.value();
}
double allocated = allocation.allocations().stream()
.filter(x -> x.resource() instanceof ResourcePath.Continuous)
.map(x -> (ResourcePath.Continuous) x.resource())
.mapToDouble(ResourcePath.Continuous::value)
.sum();
double left = original.value() - allocated;
return request.value() <= left;
}
// internal use only
private static final class ContinuousResourceAllocation {
private final ResourcePath.Continuous original;
private final ImmutableList<ResourceAllocation> allocations;
private ContinuousResourceAllocation(ResourcePath.Continuous original,
ImmutableList<ResourceAllocation> allocations) {
this.original = original;
this.allocations = allocations;
}
private ResourcePath.Continuous original() {
return original;
}
private ImmutableList<ResourceAllocation> allocations() {
return allocations;
}
}
}
......
......@@ -204,6 +204,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
......@@ -239,7 +240,8 @@ public final class KryoNamespaces {
.register(CopyOnWriteArraySet.class)
.register(ArrayList.class,
LinkedList.class,
HashSet.class
HashSet.class,
LinkedHashSet.class
)
.register(Maps.immutableEntry("a", "b").getClass())
.register(new ArraysAsListSerializer(), Arrays.asList().getClass())
......