Thomas Vachuska

Fixed an issue where consistent map exception in distributed packet store would …

…prevent apps/components from properly starting.

Fixed the fast and net-pingall.xml scenarios.

Change-Id: Ie5712c5c134bb81181dd2bdb307da5ec13851d45
...@@ -41,6 +41,7 @@ import org.onosproject.store.cluster.messaging.MessageSubject; ...@@ -41,6 +41,7 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
41 import org.onosproject.store.serializers.KryoNamespaces; 41 import org.onosproject.store.serializers.KryoNamespaces;
42 import org.onosproject.store.serializers.KryoSerializer; 42 import org.onosproject.store.serializers.KryoSerializer;
43 import org.onosproject.store.service.ConsistentMap; 43 import org.onosproject.store.service.ConsistentMap;
44 +import org.onosproject.store.service.ConsistentMapException;
44 import org.onosproject.store.service.Serializer; 45 import org.onosproject.store.service.Serializer;
45 import org.onosproject.store.service.StorageService; 46 import org.onosproject.store.service.StorageService;
46 import org.slf4j.Logger; 47 import org.slf4j.Logger;
...@@ -52,6 +53,7 @@ import java.util.concurrent.Executors; ...@@ -52,6 +53,7 @@ import java.util.concurrent.Executors;
52 import java.util.concurrent.atomic.AtomicBoolean; 53 import java.util.concurrent.atomic.AtomicBoolean;
53 54
54 import static org.onlab.util.Tools.groupedThreads; 55 import static org.onlab.util.Tools.groupedThreads;
56 +import static org.onlab.util.Tools.retryable;
55 import static org.slf4j.LoggerFactory.getLogger; 57 import static org.slf4j.LoggerFactory.getLogger;
56 58
57 /** 59 /**
...@@ -66,6 +68,8 @@ public class DistributedPacketStore ...@@ -66,6 +68,8 @@ public class DistributedPacketStore
66 68
67 private final Logger log = getLogger(getClass()); 69 private final Logger log = getLogger(getClass());
68 70
71 + private static final int MAX_BACKOFF = 10;
72 +
69 // TODO: make this configurable. 73 // TODO: make this configurable.
70 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4; 74 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
71 75
...@@ -159,11 +163,11 @@ public class DistributedPacketStore ...@@ -159,11 +163,11 @@ public class DistributedPacketStore
159 return tracker.requests(); 163 return tracker.requests();
160 } 164 }
161 165
162 - private class PacketRequestTracker { 166 + private final class PacketRequestTracker {
163 167
164 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests; 168 private ConsistentMap<TrafficSelector, Set<PacketRequest>> requests;
165 169
166 - public PacketRequestTracker() { 170 + private PacketRequestTracker() {
167 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder() 171 requests = storageService.<TrafficSelector, Set<PacketRequest>>consistentMapBuilder()
168 .withName("onos-packet-requests") 172 .withName("onos-packet-requests")
169 .withPartitionsDisabled() 173 .withPartitionsDisabled()
...@@ -171,7 +175,17 @@ public class DistributedPacketStore ...@@ -171,7 +175,17 @@ public class DistributedPacketStore
171 .build(); 175 .build();
172 } 176 }
173 177
174 - public void add(PacketRequest request) { 178 + private void add(PacketRequest request) {
179 + AtomicBoolean firstRequest =
180 + retryable(this::addInternal, ConsistentMapException.class,
181 + 3, MAX_BACKOFF).apply(request);
182 + if (firstRequest.get() && delegate != null) {
183 + // The instance that makes the first request will push to all devices
184 + delegate.requestPackets(request);
185 + }
186 + }
187 +
188 + private AtomicBoolean addInternal(PacketRequest request) {
175 AtomicBoolean firstRequest = new AtomicBoolean(false); 189 AtomicBoolean firstRequest = new AtomicBoolean(false);
176 requests.compute(request.selector(), (s, existingRequests) -> { 190 requests.compute(request.selector(), (s, existingRequests) -> {
177 if (existingRequests == null) { 191 if (existingRequests == null) {
...@@ -186,14 +200,20 @@ public class DistributedPacketStore ...@@ -186,14 +200,20 @@ public class DistributedPacketStore
186 return existingRequests; 200 return existingRequests;
187 } 201 }
188 }); 202 });
203 + return firstRequest;
204 + }
189 205
190 - if (firstRequest.get() && delegate != null) { 206 + private void remove(PacketRequest request) {
191 - // The instance that makes the first request will push to all devices 207 + AtomicBoolean removedLast =
192 - delegate.requestPackets(request); 208 + retryable(this::removeInternal, ConsistentMapException.class,
209 + 3, MAX_BACKOFF).apply(request);
210 + if (removedLast.get() && delegate != null) {
211 + // The instance that removes the last request will remove from all devices
212 + delegate.cancelPackets(request);
193 } 213 }
194 } 214 }
195 215
196 - public void remove(PacketRequest request) { 216 + private AtomicBoolean removeInternal(PacketRequest request) {
197 AtomicBoolean removedLast = new AtomicBoolean(false); 217 AtomicBoolean removedLast = new AtomicBoolean(false);
198 requests.computeIfPresent(request.selector(), (s, existingRequests) -> { 218 requests.computeIfPresent(request.selector(), (s, existingRequests) -> {
199 if (existingRequests.contains(request)) { 219 if (existingRequests.contains(request)) {
...@@ -209,15 +229,10 @@ public class DistributedPacketStore ...@@ -209,15 +229,10 @@ public class DistributedPacketStore
209 return existingRequests; 229 return existingRequests;
210 } 230 }
211 }); 231 });
212 - 232 + return removedLast;
213 - if (removedLast.get() && delegate != null) {
214 - // The instance that removes the last request will remove from all devices
215 - delegate.cancelPackets(request);
216 - }
217 -
218 } 233 }
219 234
220 - public List<PacketRequest> requests() { 235 + private List<PacketRequest> requests() {
221 List<PacketRequest> list = Lists.newArrayList(); 236 List<PacketRequest> list = Lists.newArrayList();
222 requests.values().forEach(v -> list.addAll(v.value())); 237 requests.values().forEach(v -> list.addAll(v.value()));
223 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue()); 238 list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
13 ~ See the License for the specific language governing permissions and 13 ~ See the License for the specific language governing permissions and
14 ~ limitations under the License. 14 ~ limitations under the License.
15 --> 15 -->
16 -<scenario name="smoke" description="ONOS smoke test"> 16 +<scenario name="fast" description="ONOS fast smoke test">
17 <import file="${ONOS_SCENARIOS}/prerequisites.xml"/> 17 <import file="${ONOS_SCENARIOS}/prerequisites.xml"/>
18 18
19 <import file="${ONOS_SCENARIOS}/setup.xml"/> 19 <import file="${ONOS_SCENARIOS}/setup.xml"/>
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
22 exec="onos-check-apps ${OC1} drivers,openflow,proxyarp,fwd includes"/> 22 exec="onos-check-apps ${OC1} drivers,openflow,proxyarp,fwd includes"/>
23 23
24 <!-- TODO: take this out when initial pingall sweep is 100% --> 24 <!-- TODO: take this out when initial pingall sweep is 100% -->
25 - <step name="Initial-Ping-All" requires="Check-Apps" 25 + <step name="Initial-Ping-All" requires="Check-Apps" unless="${ONOS_RF_BUG_FIXED}"
26 exec="onos-mininet sendAndExpect py net.pingAll(1) --expect received"/> 26 exec="onos-mininet sendAndExpect py net.pingAll(1) --expect received"/>
27 27
28 <step name="Ping-All-And-Verify" requires="Check-Apps,Initial-Ping-All" 28 <step name="Ping-All-And-Verify" requires="Check-Apps,Initial-Ping-All"
......