tom

Starting work on I/O loop.

1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<project xmlns="http://maven.apache.org/POM/4.0.0"
3 + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
5 + <modelVersion>4.0.0</modelVersion>
6 +
7 + <parent>
8 + <groupId>org.onlab.onos</groupId>
9 + <artifactId>onlab-utils</artifactId>
10 + <version>1.0.0-SNAPSHOT</version>
11 + <relativePath>../pom.xml</relativePath>
12 + </parent>
13 +
14 + <artifactId>onlab-nio</artifactId>
15 + <packaging>bundle</packaging>
16 +
17 + <description>Fast network I/O using Java NIO</description>
18 +
19 + <dependencies>
20 + <dependency>
21 + <groupId>com.google.guava</groupId>
22 + <artifactId>guava-testlib</artifactId>
23 + <scope>test</scope>
24 + </dependency>
25 + </dependencies>
26 +
27 +</project>
1 +package org.onlab.nio;
2 +
3 +import java.io.IOException;
4 +import java.net.SocketAddress;
5 +import java.net.StandardSocketOptions;
6 +import java.nio.channels.SelectionKey;
7 +import java.nio.channels.ServerSocketChannel;
8 +import java.util.Iterator;
9 +
10 +import static com.google.common.base.Preconditions.checkNotNull;
11 +
12 +/**
13 + * Selector loop derivative tailored to acceptConnection inbound connections.
14 + */
15 +public abstract class AcceptorLoop extends SelectorLoop {
16 +
17 + private SocketAddress listenAddress;
18 + private ServerSocketChannel socketChannel;
19 +
20 + /**
21 + * Creates an acceptor loop with the specified selection timeout and
22 + * accepting connections on the the given address.
23 + *
24 + * @param selectTimeout selection timeout; specified in millis
25 + * @param listenAddress socket address where to listen for connections
26 + * @throws IOException if the backing selector cannot be opened
27 + */
28 + public AcceptorLoop(long selectTimeout, SocketAddress listenAddress)
29 + throws IOException {
30 + super(selectTimeout);
31 + this.listenAddress = checkNotNull(this.listenAddress, "Address cannot be null");
32 + }
33 +
34 + /**
35 + * Hook to accept an inbound connection on the specified socket channel.
36 + *
37 + * @param channel socketChannel where an accept operation awaits
38 + * @throws IOException if the accept operation cannot be processed
39 + */
40 + protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException;
41 +
42 + /**
43 + * Opens a new server socket channel configured in non-blocking mode and
44 + * bound to the loop's listen address.
45 + *
46 + * @throws IOException if unable to open or configure the socket channel
47 + */
48 + protected synchronized void openChannel() throws IOException {
49 + socketChannel = ServerSocketChannel.open();
50 + socketChannel.configureBlocking(false);
51 + socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
52 + socketChannel.register(selector, SelectionKey.OP_ACCEPT);
53 + socketChannel.bind(listenAddress);
54 + }
55 +
56 + /**
57 + * Closes the server socket channel.
58 + *
59 + * @throws IOException if unable to close the socketChannel
60 + */
61 + protected synchronized void closechannel() throws IOException {
62 + if (socketChannel != null) {
63 + socketChannel.close();
64 + socketChannel = null;
65 + }
66 + }
67 +
68 + @Override
69 + public void shutdown() {
70 + try {
71 + closechannel();
72 + } catch (IOException e) {
73 + log.warn("Unable to close the socketChannel", e);
74 + }
75 + super.shutdown();
76 + }
77 +
78 + @Override
79 + protected void loop() throws IOException {
80 + openChannel();
81 + notifyReady();
82 +
83 + // Keep looping until told otherwise.
84 + while (isRunning()) {
85 + // Attempt a selection; if no operations selected or if signalled
86 + // to shutdown, spin through.
87 + int count = selector.select(selectTimeout);
88 + if (count == 0 || !isRunning()) {
89 + continue;
90 + }
91 +
92 + // Iterate over all keys selected for an operation and process them.
93 + Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
94 + while (keys.hasNext()) {
95 + // Fetch the key and remove it from the pending list.
96 + SelectionKey key = keys.next();
97 + keys.remove();
98 +
99 + // If the key has a pending acceptConnection operation, process it.
100 + if (key.isAcceptable()) {
101 + acceptConnection((ServerSocketChannel) key.channel());
102 + }
103 + }
104 + }
105 + }
106 +
107 +}
108 +
1 +package org.onlab.nio;
2 +
3 +import org.slf4j.Logger;
4 +import org.slf4j.LoggerFactory;
5 +
6 +import java.io.IOException;
7 +import java.nio.channels.Selector;
8 +
9 +import static com.google.common.base.Preconditions.checkArgument;
10 +
11 +/**
12 + * Abstraction of an I/O processing loop based on an NIO selector.
13 + */
14 +public abstract class SelectorLoop implements Runnable {
15 +
16 + protected final Logger log = LoggerFactory.getLogger(getClass());
17 +
18 + /**
19 + * Selector used by this loop to pace the I/O operations.
20 + */
21 + protected final Selector selector;
22 +
23 + /**
24 + * Selection operations timeout; specified in millis.
25 + */
26 + protected long selectTimeout;
27 +
28 + /**
29 + * Retains the error that caused the loop to exit prematurely.
30 + */
31 + private Throwable error;
32 +
33 + // State indicator
34 + private enum State { STARTING, STARTED, STOPPING, STOPPED };
35 + private volatile State state = State.STOPPED;
36 +
37 + /**
38 + * Creates a new selector loop with the given selection timeout.
39 + *
40 + * @param selectTimeout selection timeout; specified in millis
41 + * @throws IOException if the backing selector cannot be opened
42 + */
43 + public SelectorLoop(long selectTimeout) throws IOException {
44 + checkArgument(selectTimeout > 0, "Timeout must be positive");
45 + this.selectTimeout = selectTimeout;
46 + this.selector = openSelector();
47 + }
48 +
49 + /**
50 + * Opens a new selector for the use by the loop.
51 + *
52 + * @return newly open selector
53 + * @throws IOException if the backing selector cannot be opened
54 + */
55 + protected Selector openSelector() throws IOException {
56 + return Selector.open();
57 + }
58 +
59 + /**
60 + * Indicates that the loop is marked to run.
61 + */
62 + protected boolean isRunning() {
63 + return state == State.STARTED || state == State.STARTING;
64 + }
65 +
66 + /**
67 + * Returns the error, if there was one, that caused the loop to terminate
68 + * prematurely.
69 + *
70 + * @return error or null if there was none
71 + */
72 + public Throwable getError() {
73 + return error;
74 + }
75 +
76 + /**
77 + * Contains the body of the I/O selector loop.
78 + *
79 + * @throws IOException if an error is encountered while selecting I/O
80 + */
81 + protected abstract void loop() throws IOException;
82 +
83 + @Override
84 + public void run() {
85 + error = null;
86 + state = State.STARTING;
87 + try {
88 + loop();
89 + } catch (Throwable e) {
90 + error = e;
91 + log.error("Loop aborted", e);
92 + }
93 + notifyDone();
94 + }
95 +
96 + /**
97 + * Notifies observers waiting for loop to become ready.
98 + */
99 + protected synchronized void notifyReady() {
100 + state = State.STARTED;
101 + notifyAll();
102 + }
103 +
104 + /**
105 + * Triggers loop shutdown.
106 + */
107 + public void shutdown() {
108 + // Mark the loop as no longer running and wake up the selector.
109 + state = State.STOPPING;
110 + selector.wakeup();
111 + }
112 +
113 + /**
114 + * Notifies observers waiting for loop to fully stop.
115 + */
116 + private synchronized void notifyDone() {
117 + state = State.STOPPED;
118 + notifyAll();
119 + }
120 +
121 +}
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
19 <modules> 19 <modules>
20 <module>junit</module> 20 <module>junit</module>
21 <module>misc</module> 21 <module>misc</module>
22 + <module>nio</module>
22 <module>osgi</module> 23 <module>osgi</module>
23 <module>rest</module> 24 <module>rest</module>
24 </modules> 25 </modules>
......