Madan Jampani
Committed by Gerrit Code Review

ONOS-2495: Workaround for the fact that kryo deserialization is not thread safe

Change-Id: Id1bd3334f9a5c122984d08f97dbbbf622b27cf33
...@@ -32,7 +32,6 @@ import org.onosproject.store.service.Serializer; ...@@ -32,7 +32,6 @@ import org.onosproject.store.service.Serializer;
32 import org.onosproject.store.service.Versioned; 32 import org.onosproject.store.service.Versioned;
33 import org.slf4j.Logger; 33 import org.slf4j.Logger;
34 34
35 -
36 import java.util.Collection; 35 import java.util.Collection;
37 import java.util.Map; 36 import java.util.Map;
38 import java.util.Map.Entry; 37 import java.util.Map.Entry;
...@@ -122,10 +121,17 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V ...@@ -122,10 +121,17 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
122 this.purgeOnUninstall = purgeOnUninstall; 121 this.purgeOnUninstall = purgeOnUninstall;
123 this.database.registerConsumer(update -> { 122 this.database.registerConsumer(update -> {
124 SharedExecutors.getSingleThreadExecutor().execute(() -> { 123 SharedExecutors.getSingleThreadExecutor().execute(() -> {
124 + if (listeners.isEmpty()) {
125 + return;
126 + }
127 + try {
125 if (update.target() == MAP_UPDATE) { 128 if (update.target() == MAP_UPDATE) {
126 Result<UpdateResult<String, byte[]>> result = update.output(); 129 Result<UpdateResult<String, byte[]>> result = update.output();
127 if (result.success() && result.value().mapName().equals(name)) { 130 if (result.success() && result.value().mapName().equals(name)) {
128 - MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent(); 131 + MapEvent<K, V> mapEvent = result.value()
132 + .<K, V>map(this::dK,
133 + v -> serializer.decode(Tools.copyOf(v)))
134 + .toMapEvent();
129 notifyListeners(mapEvent); 135 notifyListeners(mapEvent);
130 } 136 }
131 } else if (update.target() == TX_COMMIT) { 137 } else if (update.target() == TX_COMMIT) {
...@@ -133,12 +139,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V ...@@ -133,12 +139,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
133 if (response.success()) { 139 if (response.success()) {
134 response.updates().forEach(u -> { 140 response.updates().forEach(u -> {
135 if (u.mapName().equals(name)) { 141 if (u.mapName().equals(name)) {
136 - MapEvent<K, V> mapEvent = u.<K, V>map(this::dK, serializer::decode).toMapEvent(); 142 + MapEvent<K, V> mapEvent =
143 + u.<K, V>map(this::dK,
144 + v -> serializer.decode(Tools.copyOf(v)))
145 + .toMapEvent();
137 notifyListeners(mapEvent); 146 notifyListeners(mapEvent);
138 } 147 }
139 }); 148 });
140 } 149 }
141 } 150 }
151 + } catch (Exception e) {
152 + log.warn("Error notifying listeners", e);
153 + }
142 }); 154 });
143 }); 155 });
144 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled); 156 this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
......
...@@ -190,6 +190,16 @@ public abstract class Tools { ...@@ -190,6 +190,16 @@ public abstract class Tools {
190 } 190 }
191 191
192 /** 192 /**
193 + * Returns a copy of the input byte array.
194 + *
195 + * @param original input
196 + * @return copy of original
197 + */
198 + public static byte[] copyOf(byte[] original) {
199 + return Arrays.copyOf(original, original.length);
200 + }
201 +
202 + /**
193 * Get property as a string value. 203 * Get property as a string value.
194 * 204 *
195 * @param properties properties to be looked up 205 * @param properties properties to be looked up
......