Java NIO and TCP Connections
Internal
Overview
This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use Java NIO APIs primitives introduced in Java 4.
Example
Programming Model
Server
The server code uses a Selector to multiplex over selectable channels: selectable channels: a ServerSocketChannel that listens for incoming network connections and creates new SocketChannels for each new TCP connection, and subsequently registered SocketChannels.
//
// Main selector multiplexor. We use the main thread as selector thread.
//
Selector selector = Selector.open();
//
// The ServerSocketChannel used to accept new TCP connections
//
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(PORT);
ServerSocket ss = serverSocketChannel.socket();
ss.bind(address);
//
// Register the ServerSocketChannel with the selector
//
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
The main event loop handles two types of events: new connections and data availability on the existing connections. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. If data becomes available on any of the registered SocketChannels, we use a Buffer to read it.
//
// The main event loop
//
while(true) {
//
// This call blocks until at least one I/O event occurs
//
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
//
// Figure out what kind of I/O event was selected
//
SelectionKey k = i.next();
if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
//
// New connection
//
info("new TCP connection");
//
// Remove the key from the set
//
i.remove();
//
// Retrieve the SocketChannel for the new connection, make it non-blocking, and register
// it with the same selector so now we can handle incoming data events on the same event
// loop
//
ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//
// New data available, read it
//
//
// Remove the key from the set
//
i.remove();
SocketChannel sc = (SocketChannel)k.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buffer);
if (bytesRead == -1) {
//
// TCP connection was closed
//
info("TCP connection closed");
//
// Unregister the channel, by canceling the key. If we don't do this, data availability
// events for zero-length data will keep popping up.
//
k.cancel();
}
else {
//
// Read data
//
buffer.flip();
byte[] content = new byte[bytesRead];
buffer.get(content, 0, bytesRead);
info("received: " + new String(content));
}
}
}
}
Client
The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server.
//
// Use a selector to receive data asynchronously
//
final Selector selector = Selector.open();
//
// Create the SocketChannel
//
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT));
socketChannel.configureBlocking(false);
c.info("socket connected");
//
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data
//
socketChannel.register(selector, SelectionKey.OP_READ);
//
// Create a selector thread that will be notified on asynchronous data arrival
//
new Thread(() -> {
while(true) {
try {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
SelectionKey k = i.next();
if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//
// New data available, read it
//
//
// Remove the key from the set
//
i.remove();
SocketChannel sc = (SocketChannel)k.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buffer);
if (bytesRead == -1) {
//
// TCP connection was closed
//
c.info("TCP connection closed");
//
// Unregister the channel, by canceling the key. If we don't do this, data availability
// events for zero-length data will keep popping up.
//
k.cancel();
}
else {
//
// Read data
//
buffer.flip();
byte[] content = new byte[bytesRead];
buffer.get(content, 0, bytesRead);
c.info("received : " + new String(content));
}
}
}
}
catch(IOException e) {
e.printStackTrace();
}
}
}, "Selector Thread").start();
//
// Enter the command line loop, this is where we write data on the socket
//
while(true) {
String line = c.readLine();
if (line.startsWith("exit")) {
break;
}
//
// Send data
//
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
buffer.put(line.getBytes());
buffer.flip();
while(buffer.hasRemaining()) {
socketChannel.write(buffer);
}
}
socketChannel.close();
c.info("socket channel closed");