tom

Working on IO loop tests.

1 package org.onlab.nio; 1 package org.onlab.nio;
2 2
3 +import com.google.common.collect.Lists;
3 import org.onlab.util.Counter; 4 import org.onlab.util.Counter;
4 import org.slf4j.Logger; 5 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 6 import org.slf4j.LoggerFactory;
...@@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit; ...@@ -23,6 +24,7 @@ import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException; 24 import java.util.concurrent.TimeoutException;
24 25
25 import static java.lang.String.format; 26 import static java.lang.String.format;
27 +import static java.lang.System.currentTimeMillis;
26 import static java.lang.System.out; 28 import static java.lang.System.out;
27 import static org.onlab.nio.IOLoopTestServer.PORT; 29 import static org.onlab.nio.IOLoopTestServer.PORT;
28 import static org.onlab.util.Tools.delay; 30 import static org.onlab.util.Tools.delay;
...@@ -46,15 +48,18 @@ public class IOLoopTestClient { ...@@ -46,15 +48,18 @@ public class IOLoopTestClient {
46 48
47 Counter messages; 49 Counter messages;
48 Counter bytes; 50 Counter bytes;
51 + long latencyTotal = 0;
52 + long latencyCount = 0;
53 +
49 54
50 /** 55 /**
51 * Main entry point to launch the client. 56 * Main entry point to launch the client.
52 * 57 *
53 * @param args command-line arguments 58 * @param args command-line arguments
54 - * @throws IOException if unable to connect to server 59 + * @throws java.io.IOException if unable to connect to server
55 - * @throws InterruptedException if latch wait gets interrupted 60 + * @throws InterruptedException if latch wait gets interrupted
56 - * @throws ExecutionException if wait gets interrupted 61 + * @throws java.util.concurrent.ExecutionException if wait gets interrupted
57 - * @throws TimeoutException if timeout occurred while waiting for completion 62 + * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
58 */ 63 */
59 public static void main(String[] args) 64 public static void main(String[] args)
60 throws IOException, InterruptedException, ExecutionException, TimeoutException { 65 throws IOException, InterruptedException, ExecutionException, TimeoutException {
...@@ -95,7 +100,7 @@ public class IOLoopTestClient { ...@@ -95,7 +100,7 @@ public class IOLoopTestClient {
95 * @param mc message count to send per client 100 * @param mc message count to send per client
96 * @param ml message length in bytes 101 * @param ml message length in bytes
97 * @param port socket port 102 * @param port socket port
98 - * @throws IOException if unable to create IO loops 103 + * @throws java.io.IOException if unable to create IO loops
99 */ 104 */
100 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { 105 public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
101 this.ip = ip; 106 this.ip = ip;
...@@ -113,7 +118,7 @@ public class IOLoopTestClient { ...@@ -113,7 +118,7 @@ public class IOLoopTestClient {
113 /** 118 /**
114 * Starts the client workers. 119 * Starts the client workers.
115 * 120 *
116 - * @throws IOException if unable to open connection 121 + * @throws java.io.IOException if unable to open connection
117 */ 122 */
118 public void start() throws IOException { 123 public void start() throws IOException {
119 messages = new Counter(); 124 messages = new Counter();
...@@ -141,7 +146,7 @@ public class IOLoopTestClient { ...@@ -141,7 +146,7 @@ public class IOLoopTestClient {
141 * channel with the given IO loop. 146 * channel with the given IO loop.
142 * 147 *
143 * @param loop loop with which the channel should be registered 148 * @param loop loop with which the channel should be registered
144 - * @throws IOException if the socket could not be open or connected 149 + * @throws java.io.IOException if the socket could not be open or connected
145 */ 150 */
146 private void openConnection(CustomIOLoop loop) throws IOException { 151 private void openConnection(CustomIOLoop loop) throws IOException {
147 SocketAddress sa = new InetSocketAddress(ip, port); 152 SocketAddress sa = new InetSocketAddress(ip, port);
...@@ -156,15 +161,17 @@ public class IOLoopTestClient { ...@@ -156,15 +161,17 @@ public class IOLoopTestClient {
156 * Waits for the client workers to complete. 161 * Waits for the client workers to complete.
157 * 162 *
158 * @param secs timeout in seconds 163 * @param secs timeout in seconds
159 - * @throws ExecutionException if execution failed 164 + * @throws java.util.concurrent.ExecutionException if execution failed
160 - * @throws InterruptedException if interrupt occurred while waiting 165 + * @throws InterruptedException if interrupt occurred while waiting
161 - * @throws TimeoutException if timeout occurred 166 + * @throws java.util.concurrent.TimeoutException if timeout occurred
162 */ 167 */
163 public void await(int secs) throws InterruptedException, 168 public void await(int secs) throws InterruptedException,
164 ExecutionException, TimeoutException { 169 ExecutionException, TimeoutException {
165 for (CustomIOLoop l : iloops) { 170 for (CustomIOLoop l : iloops) {
166 if (l.worker.task != null) { 171 if (l.worker.task != null) {
167 l.worker.task.get(secs, TimeUnit.SECONDS); 172 l.worker.task.get(secs, TimeUnit.SECONDS);
173 + latencyTotal += l.latencyTotal;
174 + latencyCount += l.latencyCount;
168 } 175 }
169 } 176 }
170 messages.freeze(); 177 messages.freeze();
...@@ -176,10 +183,11 @@ public class IOLoopTestClient { ...@@ -176,10 +183,11 @@ public class IOLoopTestClient {
176 */ 183 */
177 public void report() { 184 public void report() {
178 DecimalFormat f = new DecimalFormat("#,##0"); 185 DecimalFormat f = new DecimalFormat("#,##0");
179 - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs", 186 + out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
180 f.format(messages.total()), f.format(bytes.total()), 187 f.format(messages.total()), f.format(bytes.total()),
181 f.format(messages.throughput()), 188 f.format(messages.throughput()),
182 - f.format(bytes.throughput() / (1024 * msgLength)))); 189 + f.format(bytes.throughput() / (1024 * msgLength)),
190 + f.format(latencyTotal / latencyCount)));
183 } 191 }
184 192
185 193
...@@ -187,6 +195,9 @@ public class IOLoopTestClient { ...@@ -187,6 +195,9 @@ public class IOLoopTestClient {
187 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> { 195 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
188 196
189 Worker worker = new Worker(); 197 Worker worker = new Worker();
198 + long latencyTotal = 0;
199 + long latencyCount = 0;
200 +
190 201
191 public CustomIOLoop() throws IOException { 202 public CustomIOLoop() throws IOException {
192 super(500); 203 super(500);
...@@ -217,7 +228,12 @@ public class IOLoopTestClient { ...@@ -217,7 +228,12 @@ public class IOLoopTestClient {
217 228
218 @Override 229 @Override
219 protected void processMessages(List<TestMessage> messages, 230 protected void processMessages(List<TestMessage> messages,
220 - MessageStream<TestMessage> b) { 231 + MessageStream<TestMessage> stream) {
232 + for (TestMessage message : messages) {
233 + // TODO: summarize latency data better
234 + latencyTotal += currentTimeMillis() - message.requestorTime();
235 + latencyCount++;
236 + }
221 worker.release(messages.size()); 237 worker.release(messages.size());
222 } 238 }
223 239
...@@ -239,15 +255,15 @@ public class IOLoopTestClient { ...@@ -239,15 +255,15 @@ public class IOLoopTestClient {
239 private static final int BATCH_SIZE = 1000; 255 private static final int BATCH_SIZE = 1000;
240 private static final int PERMITS = 2 * BATCH_SIZE; 256 private static final int PERMITS = 2 * BATCH_SIZE;
241 257
242 - private TestMessageStream b; 258 + private TestMessageStream stream;
243 private FutureTask<Worker> task; 259 private FutureTask<Worker> task;
244 260
245 // Stuff to throttle pump 261 // Stuff to throttle pump
246 private final Semaphore semaphore = new Semaphore(PERMITS); 262 private final Semaphore semaphore = new Semaphore(PERMITS);
247 private int msgWritten; 263 private int msgWritten;
248 264
249 - void pump(TestMessageStream b) { 265 + void pump(TestMessageStream stream) {
250 - this.b = b; 266 + this.stream = stream;
251 task = new FutureTask<>(this, this); 267 task = new FutureTask<>(this, this);
252 wpool.execute(task); 268 wpool.execute(task);
253 } 269 }
...@@ -257,18 +273,15 @@ public class IOLoopTestClient { ...@@ -257,18 +273,15 @@ public class IOLoopTestClient {
257 try { 273 try {
258 log.info("Worker started..."); 274 log.info("Worker started...");
259 275
260 - List<TestMessage> batch = new ArrayList<>();
261 - for (int i = 0; i < BATCH_SIZE; i++) {
262 - batch.add(new TestMessage(msgLength));
263 - }
264 -
265 while (msgWritten < msgCount) { 276 while (msgWritten < msgCount) {
266 - msgWritten += writeBatch(b, batch); 277 + int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
278 + writeBatch(size);
279 + msgWritten += size;
267 } 280 }
268 281
269 // Now try to get all the permits back before sending poison pill 282 // Now try to get all the permits back before sending poison pill
270 semaphore.acquireUninterruptibly(PERMITS); 283 semaphore.acquireUninterruptibly(PERMITS);
271 - b.close(); 284 + stream.close();
272 285
273 log.info("Worker done..."); 286 log.info("Worker done...");
274 287
...@@ -278,18 +291,15 @@ public class IOLoopTestClient { ...@@ -278,18 +291,15 @@ public class IOLoopTestClient {
278 } 291 }
279 292
280 293
281 - private int writeBatch(TestMessageStream b, List<TestMessage> batch) 294 + private void writeBatch(int size) throws IOException {
282 - throws IOException { 295 + // Build a batch of messages
283 - int count = Math.min(BATCH_SIZE, msgCount - msgWritten); 296 + List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
284 - acquire(count); 297 + for (int i = 0; i < size; i++) {
285 - if (count == BATCH_SIZE) { 298 + batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
286 - b.write(batch); 299 + stream.padding()));
287 - } else {
288 - for (int i = 0; i < count; i++) {
289 - b.write(batch.get(i));
290 - }
291 } 300 }
292 - return count; 301 + acquire(size);
302 + stream.write(batch);
293 } 303 }
294 304
295 305
......
1 package org.onlab.nio; 1 package org.onlab.nio;
2 2
3 +import com.google.common.collect.Lists;
3 import org.onlab.util.Counter; 4 import org.onlab.util.Counter;
4 import org.slf4j.Logger; 5 import org.slf4j.Logger;
5 import org.slf4j.LoggerFactory; 6 import org.slf4j.LoggerFactory;
...@@ -19,8 +20,9 @@ import java.util.concurrent.ExecutorService; ...@@ -19,8 +20,9 @@ import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors; 20 import java.util.concurrent.Executors;
20 21
21 import static java.lang.String.format; 22 import static java.lang.String.format;
23 +import static java.lang.System.currentTimeMillis;
22 import static java.lang.System.out; 24 import static java.lang.System.out;
23 -import static org.onlab.junit.TestTools.delay; 25 +import static org.onlab.util.Tools.delay;
24 import static org.onlab.util.Tools.namedThreads; 26 import static org.onlab.util.Tools.namedThreads;
25 27
26 /** 28 /**
...@@ -58,7 +60,7 @@ public class IOLoopTestServer { ...@@ -58,7 +60,7 @@ public class IOLoopTestServer {
58 * Main entry point to launch the server. 60 * Main entry point to launch the server.
59 * 61 *
60 * @param args command-line arguments 62 * @param args command-line arguments
61 - * @throws IOException if unable to crate IO loops 63 + * @throws java.io.IOException if unable to crate IO loops
62 */ 64 */
63 public static void main(String[] args) throws IOException { 65 public static void main(String[] args) throws IOException {
64 startStandalone(args); 66 startStandalone(args);
...@@ -94,7 +96,7 @@ public class IOLoopTestServer { ...@@ -94,7 +96,7 @@ public class IOLoopTestServer {
94 * @param wc worker count 96 * @param wc worker count
95 * @param ml message length in bytes 97 * @param ml message length in bytes
96 * @param port listen port 98 * @param port listen port
97 - * @throws IOException if unable to create IO loops 99 + * @throws java.io.IOException if unable to create IO loops
98 */ 100 */
99 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException { 101 public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
100 this.workerCount = wc; 102 this.workerCount = wc;
...@@ -199,11 +201,20 @@ public class IOLoopTestServer { ...@@ -199,11 +201,20 @@ public class IOLoopTestServer {
199 protected void processMessages(List<TestMessage> messages, 201 protected void processMessages(List<TestMessage> messages,
200 MessageStream<TestMessage> stream) { 202 MessageStream<TestMessage> stream) {
201 try { 203 try {
202 - stream.write(messages); 204 + stream.write(createResponses(messages));
203 } catch (IOException e) { 205 } catch (IOException e) {
204 log.error("Unable to echo messages", e); 206 log.error("Unable to echo messages", e);
205 } 207 }
206 } 208 }
209 +
210 + private List<TestMessage> createResponses(List<TestMessage> messages) {
211 + List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
212 + for (TestMessage message : messages) {
213 + responses.add(new TestMessage(message.length(), message.requestorTime(),
214 + currentTimeMillis(), message.padding()));
215 + }
216 + return responses;
217 + }
207 } 218 }
208 219
209 // Loop for accepting client connections 220 // Loop for accepting client connections
......
...@@ -23,11 +23,10 @@ import static org.junit.Assert.assertNull; ...@@ -23,11 +23,10 @@ import static org.junit.Assert.assertNull;
23 */ 23 */
24 public class MessageStreamTest { 24 public class MessageStreamTest {
25 25
26 - private static final int SIZE = 16; 26 + private static final int SIZE = 64;
27 - private static final TestMessage MESSAGE = new TestMessage(SIZE);
28 -
29 private static final int BIG_SIZE = 32 * 1024; 27 private static final int BIG_SIZE = 32 * 1024;
30 - private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE); 28 +
29 + private TestMessage message;
31 30
32 private TestIOLoop loop; 31 private TestIOLoop loop;
33 private TestByteChannel channel; 32 private TestByteChannel channel;
...@@ -41,6 +40,8 @@ public class MessageStreamTest { ...@@ -41,6 +40,8 @@ public class MessageStreamTest {
41 key = new TestKey(channel); 40 key = new TestKey(channel);
42 stream = loop.createStream(channel); 41 stream = loop.createStream(channel);
43 stream.setKey(key); 42 stream.setKey(key);
43 + stream.setNonStrict();
44 + message = new TestMessage(SIZE, 0, 0, stream.padding());
44 } 45 }
45 46
46 @After 47 @After
...@@ -68,11 +69,13 @@ public class MessageStreamTest { ...@@ -68,11 +69,13 @@ public class MessageStreamTest {
68 public void bufferGrowth() throws IOException { 69 public void bufferGrowth() throws IOException {
69 // Create a stream for big messages and test the growth. 70 // Create a stream for big messages and test the growth.
70 stream = new TestMessageStream(BIG_SIZE, channel, loop); 71 stream = new TestMessageStream(BIG_SIZE, channel, loop);
71 - stream.write(BIG_MESSAGE); 72 + TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
72 - stream.write(BIG_MESSAGE); 73 +
73 - stream.write(BIG_MESSAGE); 74 + stream.write(bigMessage);
74 - stream.write(BIG_MESSAGE); 75 + stream.write(bigMessage);
75 - stream.write(BIG_MESSAGE); 76 + stream.write(bigMessage);
77 + stream.write(bigMessage);
78 + stream.write(bigMessage);
76 } 79 }
77 80
78 @Test 81 @Test
...@@ -102,25 +105,25 @@ public class MessageStreamTest { ...@@ -102,25 +105,25 @@ public class MessageStreamTest {
102 validate(false, false, 0, 0); 105 validate(false, false, 0, 0);
103 106
104 // First write is immediate... 107 // First write is immediate...
105 - stream.write(MESSAGE); 108 + stream.write(message);
106 validate(false, false, 0, SIZE); 109 validate(false, false, 0, SIZE);
107 110
108 // Second and third get buffered... 111 // Second and third get buffered...
109 - stream.write(MESSAGE); 112 + stream.write(message);
110 validate(false, true, 0, SIZE); 113 validate(false, true, 0, SIZE);
111 - stream.write(MESSAGE); 114 + stream.write(message);
112 validate(false, true, 0, SIZE); 115 validate(false, true, 0, SIZE);
113 116
114 // Reset write, which will flush if needed; the next write is again buffered 117 // Reset write, which will flush if needed; the next write is again buffered
115 stream.flushIfWriteNotPending(); 118 stream.flushIfWriteNotPending();
116 validate(false, false, 0, SIZE * 3); 119 validate(false, false, 0, SIZE * 3);
117 - stream.write(MESSAGE); 120 + stream.write(message);
118 validate(false, true, 0, SIZE * 3); 121 validate(false, true, 0, SIZE * 3);
119 122
120 // Select reset, which will flush if needed; the next write is again buffered 123 // Select reset, which will flush if needed; the next write is again buffered
121 stream.flushIfPossible(); 124 stream.flushIfPossible();
122 validate(false, false, 0, SIZE * 4); 125 validate(false, false, 0, SIZE * 4);
123 - stream.write(MESSAGE); 126 + stream.write(message);
124 validate(false, true, 0, SIZE * 4); 127 validate(false, true, 0, SIZE * 4);
125 stream.flush(); 128 stream.flush();
126 validate(false, true, 0, SIZE * 4); 129 validate(false, true, 0, SIZE * 4);
...@@ -132,10 +135,10 @@ public class MessageStreamTest { ...@@ -132,10 +135,10 @@ public class MessageStreamTest {
132 135
133 // First write is immediate... 136 // First write is immediate...
134 List<TestMessage> messages = new ArrayList<>(); 137 List<TestMessage> messages = new ArrayList<>();
135 - messages.add(MESSAGE); 138 + messages.add(message);
136 - messages.add(MESSAGE); 139 + messages.add(message);
137 - messages.add(MESSAGE); 140 + messages.add(message);
138 - messages.add(MESSAGE); 141 + messages.add(message);
139 142
140 stream.write(messages); 143 stream.write(messages);
141 validate(false, false, 0, SIZE * 4); 144 validate(false, false, 0, SIZE * 4);
...@@ -152,14 +155,14 @@ public class MessageStreamTest { ...@@ -152,14 +155,14 @@ public class MessageStreamTest {
152 validate(false, false, 0, 0); 155 validate(false, false, 0, 0);
153 156
154 // First write is immediate... 157 // First write is immediate...
155 - stream.write(MESSAGE); 158 + stream.write(message);
156 validate(false, false, 0, SIZE); 159 validate(false, false, 0, SIZE);
157 160
158 // Tell test channel to accept only half. 161 // Tell test channel to accept only half.
159 channel.bytesToWrite = SIZE / 2; 162 channel.bytesToWrite = SIZE / 2;
160 163
161 // Second and third get buffered... 164 // Second and third get buffered...
162 - stream.write(MESSAGE); 165 + stream.write(message);
163 validate(false, true, 0, SIZE); 166 validate(false, true, 0, SIZE);
164 stream.flushIfPossible(); 167 stream.flushIfPossible();
165 validate(true, true, 0, SIZE + SIZE / 2); 168 validate(true, true, 0, SIZE + SIZE / 2);
...@@ -170,14 +173,14 @@ public class MessageStreamTest { ...@@ -170,14 +173,14 @@ public class MessageStreamTest {
170 validate(false, false, 0, 0); 173 validate(false, false, 0, 0);
171 174
172 // First write is immediate... 175 // First write is immediate...
173 - stream.write(MESSAGE); 176 + stream.write(message);
174 validate(false, false, 0, SIZE); 177 validate(false, false, 0, SIZE);
175 178
176 // Tell test channel to accept only half. 179 // Tell test channel to accept only half.
177 channel.bytesToWrite = SIZE / 2; 180 channel.bytesToWrite = SIZE / 2;
178 181
179 // Second and third get buffered... 182 // Second and third get buffered...
180 - stream.write(MESSAGE); 183 + stream.write(message);
181 validate(false, true, 0, SIZE); 184 validate(false, true, 0, SIZE);
182 stream.flushIfWriteNotPending(); 185 stream.flushIfWriteNotPending();
183 validate(true, true, 0, SIZE + SIZE / 2); 186 validate(true, true, 0, SIZE + SIZE / 2);
...@@ -190,7 +193,7 @@ public class MessageStreamTest { ...@@ -190,7 +193,7 @@ public class MessageStreamTest {
190 assertEquals(1, messages.size()); 193 assertEquals(1, messages.size());
191 validate(false, false, SIZE + 4, 0); 194 validate(false, false, SIZE + 4, 0);
192 195
193 - stream.write(MESSAGE); 196 + stream.write(message);
194 validate(false, false, SIZE + 4, SIZE); 197 validate(false, false, SIZE + 4, SIZE);
195 198
196 channel.bytesToRead = SIZE - 4; 199 channel.bytesToRead = SIZE - 4;
......
1 package org.onlab.nio; 1 package org.onlab.nio;
2 2
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
3 /** 5 /**
4 - * Fixed-length message. 6 + * Test message for measuring rate and round-trip latency.
5 */ 7 */
6 public class TestMessage extends AbstractMessage { 8 public class TestMessage extends AbstractMessage {
7 9
8 - private final byte[] data; 10 + private final byte[] padding;
11 +
12 + private final long requestorTime;
13 + private final long responderTime;
9 14
10 /** 15 /**
11 - * Creates a new message with the specified length. 16 + * Creates a new message with the specified data.
12 * 17 *
13 - * @param length message length 18 + * @param requestorTime requester time
19 + * @param responderTime responder time
20 + * @param padding message padding
14 */ 21 */
15 - public TestMessage(int length) { 22 + TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
16 this.length = length; 23 this.length = length;
17 - data = new byte[length]; 24 + this.requestorTime = requestorTime;
25 + this.responderTime = responderTime;
26 + this.padding = checkNotNull(padding, "Padding cannot be null");
18 } 27 }
19 28
20 - /** 29 + public long requestorTime() {
21 - * Creates a new message with the specified data. 30 + return requestorTime;
22 - *
23 - * @param data message data
24 - */
25 - TestMessage(byte[] data) {
26 - this.length = data.length;
27 - this.data = data;
28 } 31 }
29 32
30 - /** 33 + public long responderTime() {
31 - * Gets the backing byte array data. 34 + return responderTime;
32 - * 35 + }
33 - * @return backing byte array 36 +
34 - */ 37 + public byte[] padding() {
35 - public byte[] data() { 38 + return padding;
36 - return data;
37 } 39 }
38 40
39 } 41 }
......
...@@ -3,53 +3,72 @@ package org.onlab.nio; ...@@ -3,53 +3,72 @@ package org.onlab.nio;
3 import java.nio.ByteBuffer; 3 import java.nio.ByteBuffer;
4 import java.nio.channels.ByteChannel; 4 import java.nio.channels.ByteChannel;
5 5
6 +import static com.google.common.base.Preconditions.checkArgument;
7 +import static com.google.common.base.Preconditions.checkState;
8 +
6 /** 9 /**
7 * Fixed-length message transfer buffer. 10 * Fixed-length message transfer buffer.
8 */ 11 */
9 public class TestMessageStream extends MessageStream<TestMessage> { 12 public class TestMessageStream extends MessageStream<TestMessage> {
10 13
11 private static final String E_WRONG_LEN = "Illegal message length: "; 14 private static final String E_WRONG_LEN = "Illegal message length: ";
15 + private static final long START_TAG = 0xfeedcafedeaddeedL;
16 + private static final long END_TAG = 0xbeadcafedeaddeedL;
17 + private static final int META_LENGTH = 40;
12 18
13 private final int length; 19 private final int length;
20 + private boolean isStrict = true;
14 21
15 - /** 22 + public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
16 - * Create a new buffer for transferring messages of the specified length.
17 - *
18 - * @param length message length
19 - * @param ch backing channel
20 - * @param loop driver loop
21 - */
22 - public TestMessageStream(int length, ByteChannel ch,
23 - IOLoop<TestMessage, ?> loop) {
24 super(loop, ch, 64 * 1024, 500); 23 super(loop, ch, 64 * 1024, 500);
24 + checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
25 this.length = length; 25 this.length = length;
26 } 26 }
27 27
28 + void setNonStrict() {
29 + isStrict = false;
30 + }
31 +
28 @Override 32 @Override
29 protected TestMessage read(ByteBuffer rb) { 33 protected TestMessage read(ByteBuffer rb) {
30 if (rb.remaining() < length) { 34 if (rb.remaining() < length) {
31 return null; 35 return null;
32 } 36 }
33 - TestMessage message = new TestMessage(length); 37 +
34 - rb.get(message.data()); 38 + long startTag = rb.getLong();
35 - return message; 39 + if (isStrict) {
40 + checkState(startTag == START_TAG, "Incorrect message start");
41 + }
42 +
43 + long size = rb.getLong();
44 + long requestorTime = rb.getLong();
45 + long responderTime = rb.getLong();
46 + byte[] padding = padding();
47 + rb.get(padding);
48 +
49 + long endTag = rb.getLong();
50 + if (isStrict) {
51 + checkState(endTag == END_TAG, "Incorrect message end");
52 + }
53 +
54 + return new TestMessage((int) size, requestorTime, responderTime, padding);
36 } 55 }
37 56
38 - /**
39 - * {@inheritDoc}
40 - * <p/>
41 - * This implementation enforces the message length against the buffer
42 - * supported length.
43 - *
44 - * @throws IllegalArgumentException if message size does not match the
45 - * supported buffer size
46 - */
47 @Override 57 @Override
48 protected void write(TestMessage message, ByteBuffer wb) { 58 protected void write(TestMessage message, ByteBuffer wb) {
49 if (message.length() != length) { 59 if (message.length() != length) {
50 throw new IllegalArgumentException(E_WRONG_LEN + message.length()); 60 throw new IllegalArgumentException(E_WRONG_LEN + message.length());
51 } 61 }
52 - wb.put(message.data()); 62 +
63 + wb.putLong(START_TAG);
64 + wb.putLong(message.length());
65 + wb.putLong(message.requestorTime());
66 + wb.putLong(message.responderTime());
67 + wb.put(message.padding(), 0, length - META_LENGTH);
68 + wb.putLong(END_TAG);
53 } 69 }
54 70
71 + public byte[] padding() {
72 + return new byte[length - META_LENGTH];
73 + }
55 } 74 }
......