Madan Jampani
Committed by Gerrit Code Review

Fix Netty messaging unit test race condition

Change-Id: Ic0c9be9e2c180dae5a9cd184625f4ec6394356af
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.cluster.messaging.impl; 16 package org.onosproject.store.cluster.messaging.impl;
17 17
18 import java.util.Arrays; 18 import java.util.Arrays;
19 +import java.util.UUID;
19 import java.util.concurrent.CompletableFuture; 20 import java.util.concurrent.CompletableFuture;
20 import java.util.concurrent.CountDownLatch; 21 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutorService; 22 import java.util.concurrent.ExecutorService;
...@@ -89,6 +90,14 @@ public class NettyMessagingManagerTest { ...@@ -89,6 +90,14 @@ public class NettyMessagingManagerTest {
89 netty2.activate(); 90 netty2.activate();
90 } 91 }
91 92
93 + /**
94 + * Returns a random String to be used as a test subject.
95 + * @return string
96 + */
97 + private String nextSubject() {
98 + return UUID.randomUUID().toString();
99 + }
100 +
92 @After 101 @After
93 public void tearDown() throws Exception { 102 public void tearDown() throws Exception {
94 if (netty1 != null) { 103 if (netty1 != null) {
...@@ -102,8 +111,9 @@ public class NettyMessagingManagerTest { ...@@ -102,8 +111,9 @@ public class NettyMessagingManagerTest {
102 111
103 @Test 112 @Test
104 public void testSendAsync() { 113 public void testSendAsync() {
114 + String subject = nextSubject();
105 CountDownLatch latch1 = new CountDownLatch(1); 115 CountDownLatch latch1 = new CountDownLatch(1);
106 - CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes()); 116 + CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
107 response.whenComplete((r, e) -> { 117 response.whenComplete((r, e) -> {
108 assertNull(e); 118 assertNull(e);
109 latch1.countDown(); 119 latch1.countDown();
...@@ -111,7 +121,7 @@ public class NettyMessagingManagerTest { ...@@ -111,7 +121,7 @@ public class NettyMessagingManagerTest {
111 Uninterruptibles.awaitUninterruptibly(latch1); 121 Uninterruptibles.awaitUninterruptibly(latch1);
112 122
113 CountDownLatch latch2 = new CountDownLatch(1); 123 CountDownLatch latch2 = new CountDownLatch(1);
114 - response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes()); 124 + response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
115 response.whenComplete((r, e) -> { 125 response.whenComplete((r, e) -> {
116 assertNotNull(e); 126 assertNotNull(e);
117 latch2.countDown(); 127 latch2.countDown();
...@@ -121,6 +131,7 @@ public class NettyMessagingManagerTest { ...@@ -121,6 +131,7 @@ public class NettyMessagingManagerTest {
121 131
122 @Test 132 @Test
123 public void testSendAndReceive() { 133 public void testSendAndReceive() {
134 + String subject = nextSubject();
124 AtomicBoolean handlerInvoked = new AtomicBoolean(false); 135 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
125 AtomicReference<byte[]> request = new AtomicReference<>(); 136 AtomicReference<byte[]> request = new AtomicReference<>();
126 AtomicReference<Endpoint> sender = new AtomicReference<>(); 137 AtomicReference<Endpoint> sender = new AtomicReference<>();
...@@ -131,9 +142,9 @@ public class NettyMessagingManagerTest { ...@@ -131,9 +142,9 @@ public class NettyMessagingManagerTest {
131 request.set(data); 142 request.set(data);
132 return "hello there".getBytes(); 143 return "hello there".getBytes();
133 }; 144 };
134 - netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor()); 145 + netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
135 146
136 - CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes()); 147 + CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
137 assertTrue(Arrays.equals("hello there".getBytes(), response.join())); 148 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
138 assertTrue(handlerInvoked.get()); 149 assertTrue(handlerInvoked.get());
139 assertTrue(Arrays.equals(request.get(), "hello world".getBytes())); 150 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
...@@ -146,6 +157,7 @@ public class NettyMessagingManagerTest { ...@@ -146,6 +157,7 @@ public class NettyMessagingManagerTest {
146 */ 157 */
147 @Test 158 @Test
148 public void testSendAndReceiveWithExecutor() { 159 public void testSendAndReceiveWithExecutor() {
160 + String subject = nextSubject();
149 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread")); 161 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
150 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread")); 162 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
151 AtomicReference<String> handlerThreadName = new AtomicReference<>(); 163 AtomicReference<String> handlerThreadName = new AtomicReference<>();
...@@ -163,10 +175,10 @@ public class NettyMessagingManagerTest { ...@@ -163,10 +175,10 @@ public class NettyMessagingManagerTest {
163 } 175 }
164 return "hello there".getBytes(); 176 return "hello there".getBytes();
165 }; 177 };
166 - netty2.registerHandler("test-subject", handler, handlerExecutor); 178 + netty2.registerHandler(subject, handler, handlerExecutor);
167 179
168 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, 180 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
169 - "test-subject", 181 + subject,
170 "hello world".getBytes(), 182 "hello world".getBytes(),
171 completionExecutor); 183 completionExecutor);
172 response.whenComplete((r, e) -> { 184 response.whenComplete((r, e) -> {
......