alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

package org.onlab.onos.net;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Objects;
import org.onlab.onos.net.link.LinkDescription;
import com.google.common.base.MoreObjects;
// TODO Consider renaming.
......@@ -10,7 +14,7 @@ import com.google.common.base.MoreObjects;
/**
* Immutable representation of a link identity.
*/
public class LinkKey {
public final class LinkKey {
private final ConnectPoint src;
private final ConnectPoint dst;
......@@ -39,18 +43,40 @@ public class LinkKey {
* @param src source connection point
* @param dst destination connection point
*/
public LinkKey(ConnectPoint src, ConnectPoint dst) {
this.src = src;
this.dst = dst;
private LinkKey(ConnectPoint src, ConnectPoint dst) {
this.src = checkNotNull(src);
this.dst = checkNotNull(dst);
}
/**
* Creates a link identifier with source and destination connection point.
*
* @param src source connection point
* @param dst destination connection point
* @return a link identifier
*/
public static LinkKey linkKey(ConnectPoint src, ConnectPoint dst) {
return new LinkKey(src, dst);
}
/**
* Creates a link identifier for the specified link.
*
* @param link link descriptor
* @return a link identifier
*/
public static LinkKey linkKey(Link link) {
return new LinkKey(link.src(), link.dst());
}
/**
* Creates a link identifier for the specified link.
*
* @param desc link description
* @return a link identifier
*/
public LinkKey(Link link) {
this(link.src(), link.dst());
public static LinkKey linkKey(LinkDescription desc) {
return new LinkKey(desc.src(), desc.dst());
}
@Override
......@@ -65,7 +91,7 @@ public class LinkKey {
}
if (obj instanceof LinkKey) {
final LinkKey other = (LinkKey) obj;
return Objects.equals(this.src(), other.src()) &&
return Objects.equals(this.src, other.src) &&
Objects.equals(this.dst, other.dst);
}
return false;
......@@ -74,7 +100,7 @@ public class LinkKey {
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("src", src())
.add("src", src)
.add("dst", dst)
.toString();
}
......
package org.onlab.onos.net.host;
import org.onlab.onos.store.Timestamp;
import org.onlab.packet.MacAddress;
/**
* Interface for a logical clock service that issues per host timestamps.
*/
public interface HostClockService {
/**
* Returns a new timestamp for the specified host mac address.
* @param hostMac host MAC address.
* @return timestamp.
*/
public Timestamp getTimestamp(MacAddress hostMac);
}
......@@ -28,6 +28,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -82,14 +83,14 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
@Override
public void addTrackedResources(IntentId intentId, Collection<Link> resources) {
for (Link link : resources) {
intentsByLink.put(new LinkKey(link), intentId);
intentsByLink.put(linkKey(link), intentId);
}
}
@Override
public void removeTrackedResources(IntentId intentId, Collection<Link> resources) {
for (Link link : resources) {
intentsByLink.remove(new LinkKey(link), intentId);
intentsByLink.remove(linkKey(link), intentId);
}
}
......@@ -125,7 +126,7 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
if (reason instanceof LinkEvent) {
LinkEvent linkEvent = (LinkEvent) reason;
if (linkEvent.type() == LINK_REMOVED) {
Set<IntentId> intentIds = intentsByLink.get(new LinkKey(linkEvent.subject()));
Set<IntentId> intentIds = intentsByLink.get(linkKey(linkEvent.subject()));
toBeRecompiled.addAll(intentIds);
}
recompileOnly = recompileOnly && linkEvent.type() == LINK_REMOVED;
......
......@@ -1090,7 +1090,7 @@ public class GossipDeviceStore
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.info("No other peers in the cluster.");
log.debug("No other peers in the cluster.");
return;
}
......
package org.onlab.onos.store.host.impl;
import static org.slf4j.LoggerFactory.getLogger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.host.HostClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.impl.WallClockTimestamp;
import org.onlab.packet.MacAddress;
import org.slf4j.Logger;
/**
* HostClockService to issue Timestamps based on local wallclock time.
*/
@Component(immediate = true)
@Service
public class HostClockManager implements HostClockService {
private final Logger log = getLogger(getClass());
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public Timestamp getTimestamp(MacAddress hostMac) {
return new WallClockTimestamp();
}
}
......@@ -10,8 +10,12 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
/**
* Default implementation of Timestamp.
* TODO: Better documentation.
* A logical timestamp that derives its value from two things:
* <ul>
* <li> The current mastership term of the device.</li>
* <li> The value of the counter used for tracking topology events observed from
* the device during that current time of a device. </li>
* </ul>
*/
public final class MastershipBasedTimestamp implements Timestamp {
......
package org.onlab.onos.store.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Objects;
import org.onlab.onos.store.Timestamp;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
/**
* A Timestamp that derives its value from the prevailing
* wallclock time on the controller where it is generated.
*/
public class WallClockTimestamp implements Timestamp {
private final long unixTimestamp;
public WallClockTimestamp() {
unixTimestamp = System.currentTimeMillis();
}
@Override
public int compareTo(Timestamp o) {
checkArgument(o instanceof WallClockTimestamp,
"Must be WallClockTimestamp", o);
WallClockTimestamp that = (WallClockTimestamp) o;
return ComparisonChain.start()
.compare(this.unixTimestamp, that.unixTimestamp)
.result();
}
@Override
public int hashCode() {
return Objects.hash(unixTimestamp);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof WallClockTimestamp)) {
return false;
}
WallClockTimestamp that = (WallClockTimestamp) obj;
return Objects.equals(this.unixTimestamp, that.unixTimestamp);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("unixTimestamp", unixTimestamp)
.toString();
}
/**
* Returns the unixTimestamp.
*
* @return unix timestamp
*/
public long unixTimestamp() {
return unixTimestamp;
}
}
......@@ -67,6 +67,7 @@ import static org.onlab.onos.net.DefaultAnnotations.union;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -203,7 +204,7 @@ public class GossipLinkStore
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
return links.get(new LinkKey(src, dst));
return links.get(linkKey(src, dst));
}
@Override
......@@ -237,14 +238,20 @@ public class GossipLinkStore
final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
LinkEvent event = createOrUpdateLinkInternal(providerId, deltaDesc);
LinkKey key = linkKey(linkDescription);
final LinkEvent event;
final Timestamped<LinkDescription> mergedDesc;
synchronized (getLinkDescriptions(key)) {
event = createOrUpdateLinkInternal(providerId, deltaDesc);
mergedDesc = getLinkDescriptions(key).get(providerId);
}
if (event != null) {
log.info("Notifying peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
providerId, linkDescription.src(), linkDescription.dst());
try {
notifyPeers(new InternalLinkEvent(providerId, deltaDesc));
notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
} catch (IOException e) {
log.info("Failed to notify peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
......@@ -258,7 +265,7 @@ public class GossipLinkStore
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
LinkKey key = new LinkKey(linkDescription.value().src(), linkDescription.value().dst());
LinkKey key = linkKey(linkDescription.value());
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
synchronized (descs) {
......@@ -351,7 +358,7 @@ public class GossipLinkStore
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
final LinkKey key = new LinkKey(src, dst);
final LinkKey key = linkKey(src, dst);
DeviceId dstDeviceId = dst.deviceId();
Timestamp timestamp = deviceClockService.getTimestamp(dstDeviceId);
......@@ -538,7 +545,7 @@ public class GossipLinkStore
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.info("No other peers in the cluster.");
log.debug("No other peers in the cluster.");
return;
}
......
package org.onlab.onos.store.impl;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import org.junit.Test;
import org.onlab.onos.store.Timestamp;
import org.onlab.util.KryoPool;
import com.google.common.testing.EqualsTester;
/**
* Tests for {@link WallClockTimestamp}.
*/
public class WallClockTimestampTest {
@Test
public final void testBasic() throws InterruptedException {
WallClockTimestamp ts1 = new WallClockTimestamp();
Thread.sleep(50);
WallClockTimestamp ts2 = new WallClockTimestamp();
assertTrue(ts1.compareTo(ts1) == 0);
assertTrue(ts2.compareTo(ts1) > 0);
assertTrue(ts1.compareTo(ts2) < 0);
}
@Test
public final void testKryoSerializable() {
WallClockTimestamp ts1 = new WallClockTimestamp();
final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
final KryoPool kryos = KryoPool.newBuilder()
.register(WallClockTimestamp.class)
.build();
kryos.serialize(ts1, buffer);
buffer.flip();
Timestamp copy = kryos.deserialize(buffer);
new EqualsTester()
.addEqualityGroup(ts1, copy)
.testEquals();
}
}
package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.EntryView;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.IMap;
import com.hazelcast.core.MapEvent;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.aggregation.Aggregation;
import com.hazelcast.mapreduce.aggregation.Supplier;
import com.hazelcast.monitor.LocalMapStats;
import com.hazelcast.query.Predicate;
// TODO: implement Predicate, etc. if we need them.
/**
* Wrapper around IMap<byte[], byte[]> which serializes/deserializes
* Key and Value using StoreSerializer.
*
* @param <K> key type
* @param <V> value type
*/
public class SMap<K, V> implements IMap<K, V> {
private final IMap<byte[], byte[]> m;
private final StoreSerializer serializer;
/**
* Creates a SMap instance.
*
* @param baseMap base IMap to use
* @param serializer serializer to use for both key and value
*/
public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
this.m = checkNotNull(baseMap);
this.serializer = checkNotNull(serializer);
}
@Override
public int size() {
return m.size();
}
@Override
public boolean isEmpty() {
return m.isEmpty();
}
@Override
public void putAll(Map<? extends K, ? extends V> map) {
Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
}
m.putAll(sm);
}
@Deprecated
@Override
public Object getId() {
return m.getId();
}
@Override
public String getPartitionKey() {
return m.getPartitionKey();
}
@Override
public String getName() {
return m.getName();
}
@Override
public String getServiceName() {
return m.getServiceName();
}
@Override
public void destroy() {
m.destroy();
}
@Override
public boolean containsKey(Object key) {
return m.containsKey(serializeKey(key));
}
@Override
public boolean containsValue(Object value) {
return m.containsValue(serializeVal(value));
}
@Override
public V get(Object key) {
return deserializeVal(m.get(serializeKey(key)));
}
@Override
public V put(K key, V value) {
return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
}
@Override
public V remove(Object key) {
return deserializeVal(m.remove(serializeKey(key)));
}
@Override
public boolean remove(Object key, Object value) {
return m.remove(serializeKey(key), serializeVal(value));
}
@Override
public void delete(Object key) {
m.delete(serializeKey(key));
}
@Override
public void flush() {
m.flush();
}
@Override
public Map<K, V> getAll(Set<K> keys) {
Set<byte[]> sk = serializeKeySet(keys);
Map<byte[], byte[]> bm = m.getAll(sk);
Map<K, V> dsm = new HashMap<>(bm.size());
for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
}
return dsm;
}
@Override
public void loadAll(boolean replaceExistingValues) {
m.loadAll(replaceExistingValues);
}
@Override
public void loadAll(Set<K> keys, boolean replaceExistingValues) {
Set<byte[]> sk = serializeKeySet(keys);
m.loadAll(sk, replaceExistingValues);
}
@Override
public void clear() {
m.clear();
}
@Override
public Future<V> getAsync(K key) {
Future<byte[]> f = m.getAsync(serializeKey(key));
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public Future<V> putAsync(K key, V value) {
Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public Future<V> removeAsync(K key) {
Future<byte[]> f = m.removeAsync(serializeKey(key));
return Futures.lazyTransform(f, new DeserializeVal());
}
@Override
public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
return m.tryRemove(serializeKey(key), timeout, timeunit);
}
@Override
public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
}
@Override
public V put(K key, V value, long ttl, TimeUnit timeunit) {
return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
}
@Override
public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
}
@Override
public V putIfAbsent(K key, V value) {
return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
}
@Override
public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
}
@Override
public V replace(K key, V value) {
return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
}
@Override
public void set(K key, V value) {
m.set(serializeKey(key), serializeVal(value));
}
@Override
public void set(K key, V value, long ttl, TimeUnit timeunit) {
m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
}
@Override
public void lock(K key) {
m.lock(serializeKey(key));
}
@Override
public void lock(K key, long leaseTime, TimeUnit timeUnit) {
m.lock(serializeKey(key), leaseTime, timeUnit);
}
@Override
public boolean isLocked(K key) {
return m.isLocked(serializeKey(key));
}
@Override
public boolean tryLock(K key) {
return m.tryLock(serializeKey(key));
}
@Override
public boolean tryLock(K key, long time, TimeUnit timeunit)
throws InterruptedException {
return m.tryLock(serializeKey(key), time, timeunit);
}
@Override
public void unlock(K key) {
m.unlock(serializeKey(key));
}
@Override
public void forceUnlock(K key) {
m.forceUnlock(serializeKey(key));
}
@Override
public String addLocalEntryListener(EntryListener<K, V> listener) {
return m.addLocalEntryListener(new BaseEntryListener(listener));
}
@Deprecated // marking method not implemented
@Override
public String addLocalEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public String addLocalEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, K key, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public String addInterceptor(MapInterceptor interceptor) {
throw new UnsupportedOperationException();
}
@Override
public void removeInterceptor(String id) {
m.removeInterceptor(id);
}
@Override
public String addEntryListener(EntryListener<K, V> listener,
boolean includeValue) {
return m.addEntryListener(new BaseEntryListener(listener), includeValue);
}
@Override
public boolean removeEntryListener(String id) {
return m.removeEntryListener(id);
}
@Override
public String addEntryListener(EntryListener<K, V> listener, K key,
boolean includeValue) {
return m.addEntryListener(new BaseEntryListener(listener),
serializeKey(key), includeValue);
}
@Deprecated // marking method not implemented
@Override
public String addEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public String addEntryListener(EntryListener<K, V> listener,
Predicate<K, V> predicate, K key, boolean includeValue) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public EntryView<K, V> getEntryView(K key) {
throw new UnsupportedOperationException();
}
@Override
public boolean evict(K key) {
return m.evict(serializeKey(key));
}
@Override
public void evictAll() {
m.evictAll();
}
@Override
public Set<K> keySet() {
return deserializeKeySet(m.keySet());
}
@Override
public Collection<V> values() {
return deserializeVal(m.values());
}
@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
return deserializeEntrySet(m.entrySet());
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<K> keySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Collection<V> values(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Override
public Set<K> localKeySet() {
return deserializeKeySet(m.localKeySet());
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<K> localKeySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public void addIndex(String attribute, boolean ordered) {
throw new UnsupportedOperationException();
}
@Override
public LocalMapStats getLocalMapStats() {
return m.getLocalMapStats();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Object executeOnKey(K key, EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Map<K, Object> executeOnKeys(Set<K> keys,
EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public void submitToKey(K key, EntryProcessor entryProcessor,
ExecutionCallback callback) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Future submitToKey(K key, EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public <SuppliedValue, Result> Result aggregate(
Supplier<K, V, SuppliedValue> supplier,
Aggregation<K, SuppliedValue, Result> aggregation) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@Override
public <SuppliedValue, Result> Result aggregate(
Supplier<K, V, SuppliedValue> supplier,
Aggregation<K, SuppliedValue, Result> aggregation,
JobTracker jobTracker) {
throw new UnsupportedOperationException();
}
private byte[] serializeKey(Object key) {
return serializer.encode(key);
}
private K deserializeKey(byte[] key) {
return serializer.decode(key);
}
private byte[] serializeVal(Object val) {
return serializer.encode(val);
}
private V deserializeVal(byte[] val) {
return serializer.decode(val);
}
private Set<byte[]> serializeKeySet(Set<K> keys) {
Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
for (K key : keys) {
sk.add(serializeKey(key));
}
return sk;
}
private Set<K> deserializeKeySet(Set<byte[]> keys) {
Set<K> dsk = new HashSet<>(keys.size());
for (byte[] key : keys) {
dsk.add(deserializeKey(key));
}
return dsk;
}
private Collection<V> deserializeVal(Collection<byte[]> vals) {
Collection<V> dsl = new ArrayList<>(vals.size());
for (byte[] val : vals) {
dsl.add(deserializeVal(val));
}
return dsl;
}
private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
Set<java.util.Map.Entry<byte[], byte[]>> entries) {
Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
dse.add(Pair.of(deserializeKey(entry.getKey()),
deserializeVal(entry.getValue())));
}
return dse;
}
private final class BaseEntryListener
implements EntryListener<byte[], byte[]> {
private final EntryListener<K, V> listener;
public BaseEntryListener(EntryListener<K, V> listener) {
this.listener = listener;
}
@Override
public void mapEvicted(MapEvent event) {
listener.mapEvicted(event);
}
@Override
public void mapCleared(MapEvent event) {
listener.mapCleared(event);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
deserializeVal(event.getOldValue()),
deserializeVal(event.getValue()));
listener.entryUpdated(evt);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
deserializeVal(event.getOldValue()),
null);
listener.entryRemoved(evt);
}
@Override
public void entryEvicted(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
deserializeVal(event.getOldValue()),
deserializeVal(event.getValue()));
listener.entryEvicted(evt);
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
EntryEvent<K, V> evt = new EntryEvent<K, V>(
event.getSource(),
event.getMember(),
event.getEventType().getType(),
deserializeKey(event.getKey()),
null,
deserializeVal(event.getValue()));
listener.entryAdded(evt);
}
}
private final class DeserializeVal implements Function<byte[], V> {
@Override
public V apply(byte[] input) {
return deserializeVal(input);
}
}
}
......@@ -3,6 +3,7 @@ package org.onlab.onos.store.link.impl;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
......@@ -122,7 +123,7 @@ public class DistributedLinkStore
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
return links.getUnchecked(new LinkKey(src, dst)).orNull();
return links.getUnchecked(linkKey(src, dst)).orNull();
}
@Override
......@@ -150,7 +151,7 @@ public class DistributedLinkStore
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
LinkKey key = linkKey(linkDescription);
Optional<DefaultLink> link = links.getUnchecked(key);
if (!link.isPresent()) {
return createLink(providerId, key, linkDescription);
......@@ -216,7 +217,7 @@ public class DistributedLinkStore
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
synchronized (this) {
LinkKey key = new LinkKey(src, dst);
LinkKey key = linkKey(src, dst);
byte[] keyBytes = serialize(key);
Link link = deserialize(rawLinks.remove(keyBytes));
links.invalidate(key);
......
......@@ -3,6 +3,7 @@ package org.onlab.onos.store.link.impl;
import static org.junit.Assert.*;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.Link.Type.*;
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import java.util.HashMap;
......@@ -122,8 +123,8 @@ public class DistributedLinkStoreTest {
assertEquals("initialy empty", 0,
Iterables.size(linkStore.getLinks()));
LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId1 = linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -134,7 +135,7 @@ public class DistributedLinkStoreTest {
Map<LinkKey, Link> links = new HashMap<>();
for (Link link : linkStore.getLinks()) {
links.put(new LinkKey(link.src(), link.dst()), link);
links.put(linkKey(link), link);
}
assertLink(linkId1, DIRECT, links.get(linkId1));
......@@ -143,9 +144,9 @@ public class DistributedLinkStoreTest {
@Test
public final void testGetDeviceEgressLinks() {
LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -166,9 +167,9 @@ public class DistributedLinkStoreTest {
@Test
public final void testGetDeviceIngressLinks() {
LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -191,7 +192,7 @@ public class DistributedLinkStoreTest {
public final void testGetLink() {
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(src, dst);
LinkKey linkId1 = linkKey(src, dst);
putLink(linkId1, DIRECT);
......@@ -206,9 +207,9 @@ public class DistributedLinkStoreTest {
public final void testGetEgressLinks() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = linkKey(d1P1, d2P2);
LinkKey linkId2 = linkKey(d2P2, d1P1);
LinkKey linkId3 = linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -231,9 +232,9 @@ public class DistributedLinkStoreTest {
public final void testGetIngressLinks() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = linkKey(d1P1, d2P2);
LinkKey linkId2 = linkKey(d2P2, d1P1);
LinkKey linkId3 = linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -282,8 +283,8 @@ public class DistributedLinkStoreTest {
public final void testRemoveLink() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
LinkKey linkId1 = linkKey(d1P1, d2P2);
LinkKey linkId2 = linkKey(d2P2, d1P1);
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -306,7 +307,7 @@ public class DistributedLinkStoreTest {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
final LinkKey linkId1 = new LinkKey(d1P1, d2P2);
final LinkKey linkId1 = linkKey(d1P1, d2P2);
final CountDownLatch addLatch = new CountDownLatch(1);
LinkStoreDelegate checkAdd = new LinkStoreDelegate() {
......
......@@ -31,6 +31,6 @@ public class LinkKeySerializer extends Serializer<LinkKey> {
public LinkKey read(Kryo kryo, Input input, Class<LinkKey> type) {
ConnectPoint src = (ConnectPoint) kryo.readClassAndObject(input);
ConnectPoint dst = (ConnectPoint) kryo.readClassAndObject(input);
return new LinkKey(src, dst);
return LinkKey.linkKey(src, dst);
}
}
......
......@@ -108,7 +108,7 @@ public class KryoSerializerTest {
testSerialized(ImmutableSet.of());
testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
testSerialized(IpAddress.valueOf("192.168.0.1"));
testSerialized(new LinkKey(CP1, CP2));
testSerialized(LinkKey.linkKey(CP1, CP2));
testSerialized(new NodeId("SomeNodeIdentifier"));
testSerialized(P1);
testSerialized(PID);
......
......@@ -42,6 +42,7 @@ import static org.onlab.onos.net.DefaultAnnotations.union;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.LinkKey.linkKey;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
......@@ -120,7 +121,7 @@ public class SimpleLinkStore
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
return links.get(new LinkKey(src, dst));
return links.get(linkKey(src, dst));
}
@Override
......@@ -148,7 +149,7 @@ public class SimpleLinkStore
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
LinkKey key = linkKey(linkDescription);
ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
synchronized (descs) {
......@@ -225,7 +226,7 @@ public class SimpleLinkStore
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
final LinkKey key = new LinkKey(src, dst);
final LinkKey key = linkKey(src, dst);
ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
synchronized (descs) {
Link link = links.remove(key);
......
......@@ -136,8 +136,8 @@ public class SimpleLinkStoreTest {
assertEquals("initialy empty", 0,
Iterables.size(linkStore.getLinks()));
LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId1 = LinkKey.linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = LinkKey.linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -148,7 +148,7 @@ public class SimpleLinkStoreTest {
Map<LinkKey, Link> links = new HashMap<>();
for (Link link : linkStore.getLinks()) {
links.put(new LinkKey(link.src(), link.dst()), link);
links.put(LinkKey.linkKey(link), link);
}
assertLink(linkId1, DIRECT, links.get(linkId1));
......@@ -157,9 +157,9 @@ public class SimpleLinkStoreTest {
@Test
public final void testGetDeviceEgressLinks() {
LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = LinkKey.linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = LinkKey.linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -180,9 +180,9 @@ public class SimpleLinkStoreTest {
@Test
public final void testGetDeviceIngressLinks() {
LinkKey linkId1 = new LinkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = new LinkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = LinkKey.linkKey(new ConnectPoint(DID1, P1), new ConnectPoint(DID2, P2));
LinkKey linkId2 = LinkKey.linkKey(new ConnectPoint(DID2, P2), new ConnectPoint(DID1, P1));
LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -205,7 +205,7 @@ public class SimpleLinkStoreTest {
public final void testGetLink() {
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(src, dst);
LinkKey linkId1 = LinkKey.linkKey(src, dst);
putLink(linkId1, DIRECT);
......@@ -220,9 +220,9 @@ public class SimpleLinkStoreTest {
public final void testGetEgressLinks() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2);
LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1);
LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -245,9 +245,9 @@ public class SimpleLinkStoreTest {
public final void testGetIngressLinks() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
LinkKey linkId3 = new LinkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2);
LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1);
LinkKey linkId3 = LinkKey.linkKey(new ConnectPoint(DID1, P2), new ConnectPoint(DID2, P3));
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
......@@ -349,8 +349,8 @@ public class SimpleLinkStoreTest {
public final void testRemoveLink() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2);
LinkKey linkId2 = LinkKey.linkKey(d2P2, d1P1);
putLink(linkId1, DIRECT, A1);
putLink(linkId2, DIRECT, A2);
......@@ -406,7 +406,7 @@ public class SimpleLinkStoreTest {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
final LinkKey linkId1 = new LinkKey(d1P1, d2P2);
final LinkKey linkId1 = LinkKey.linkKey(d1P1, d2P2);
final CountDownLatch addLatch = new CountDownLatch(1);
LinkStoreDelegate checkAdd = new LinkStoreDelegate() {
......