Aaron Kruglikov

Fixes for a hanging issue when creating treemaps and multimaps through the storage parition client.

Change-Id: I3886310bcabbec7499a57f1e91b21e8a36d33e4e
......@@ -15,13 +15,12 @@
*/
package org.onosproject.store.primitives.impl;
import java.util.Arrays;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.variables.internal.LongCommands;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
......@@ -31,7 +30,10 @@ import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMultimapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
......@@ -47,8 +49,7 @@ import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.Arrays;
/**
* Serializer utility for Atomix Catalyst.
......@@ -98,10 +99,13 @@ public final class CatalystSerializers {
serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.resolve(new AtomixConsistentTreeMapCommands.TypeResolver());
serializer.resolve(new AtomixConsistentMultimapCommands.TypeResolver());
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
.registerClassLoader(AtomixLeaderElectorFactory.class)
.registerClassLoader(AtomixWorkQueueFactory.class);
.registerClassLoader(AtomixWorkQueueFactory.class)
.registerClassLoader(AtomixConsistentTreeMapFactory.class)
.registerClassLoader(AtomixConsistentSetMultimapFactory.class);
return serializer;
}
......
......@@ -38,7 +38,12 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Created by admin on 8/3/16.
* An {@code AsyncConsistentTreeMap} that maps its operations to operations on
* a differently typed {@code AsyncConsistentTreeMap} by transcoding operation
* inputs and outputs.
*
* @param <V2> value type of other map
* @param <V1> value type of this map
*/
public class TranscodingAsyncConsistentTreeMap<V1, V2>
implements AsyncConsistentTreeMap<V1> {
......@@ -79,7 +84,8 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform
.apply(entry.getValue())));
}
@Override
......@@ -90,7 +96,8 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform
.apply(entry.getValue())));
}
@Override
......@@ -101,8 +108,9 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
.thenApply(entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
versionedValueTransform
.apply(entry.getValue())));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
......@@ -111,7 +119,8 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform
.apply(entry.getValue())));
}
@Override
......@@ -121,7 +130,8 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
.thenApply(entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform
.apply(entry.getValue())));
}
@Override
......@@ -131,7 +141,8 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
.thenApply(
entry -> Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform
.apply(entry.getValue())));
}
@Override
......@@ -141,7 +152,8 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
.thenApply(
entry -> Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform
.apply(entry.getValue())));
}
@Override
......@@ -150,7 +162,7 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
return backingMap.pollLastEntry()
.thenApply(entry -> Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
versionedValueTransform.apply(entry.getValue())));
}
@Override
......@@ -210,7 +222,7 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
@Override
public CompletableFuture<Versioned<V1>> get(String key) {
return backingMap.get(key).thenApply(value -> value.map(valueDecoder));
return backingMap.get(key).thenApply(versionedValueTransform);
}
@Override
......@@ -237,18 +249,18 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
@Override
public CompletableFuture<Versioned<V1>> put(String key, V1 value) {
return backingMap.put(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
.thenApply(versionedValueTransform);
}
@Override
public CompletableFuture<Versioned<V1>> putAndGet(String key, V1 value) {
return backingMap.putAndGet(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
.thenApply(versionedValueTransform);
}
@Override
public CompletableFuture<Versioned<V1>> remove(String key) {
return backingMap.remove(key).thenApply(v -> v.map(valueDecoder));
return backingMap.remove(key).thenApply(versionedValueTransform);
}
@Override
......@@ -264,7 +276,7 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
@Override
public CompletableFuture<Collection<Versioned<V1>>> values() {
return backingMap.values().thenApply(valueSet -> valueSet.stream()
.map(v -> v.map(valueDecoder)).collect(Collectors.toSet()));
.map(versionedValueTransform).collect(Collectors.toSet()));
}
@Override
......@@ -275,16 +287,18 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
entries -> entries
.stream()
.map(entry ->
Maps.immutableEntry(entry.getKey(),
entry.getValue()
.map(valueDecoder)))
Maps.immutableEntry(
entry.getKey(),
versionedValueTransform
.apply(entry.getValue())
))
.collect(Collectors.toSet()));
}
@Override
public CompletableFuture<Versioned<V1>> putIfAbsent(String key, V1 value) {
return backingMap.putIfAbsent(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
.thenApply(versionedValueTransform);
}
@Override
......@@ -300,7 +314,7 @@ public class TranscodingAsyncConsistentTreeMap<V1, V2>
@Override
public CompletableFuture<Versioned<V1>> replace(String key, V1 value) {
return backingMap.replace(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
.thenApply(versionedValueTransform);
}
@Override
......