Java NIO and TCP Connections: Difference between revisions
(→Server) |
|||
(18 intermediate revisions by the same user not shown) | |||
Line 6: | Line 6: | ||
=Overview= | =Overview= | ||
This article describes the programming model involved in establishing a simple TCP connection | This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use [[Java_Non-Blocking_I/O_Concepts#Overview|Java NIO APIs]] [[Java_Non-Blocking_I/O_Concepts#Primitives|primitives]] introduced in Java 4. | ||
=Example= | |||
{{External|[https://github.com/NovaOrdis/playground/tree/master/java/nio/tcp Playground Java NIO and TCP Connections]}} | |||
=Programming Model= | =Programming Model= | ||
[[Image:JavaNIOAndTCPConnections.png]] | |||
==Server== | ==Server== | ||
The server code uses a [[Java_Non-Blocking_I/O_Concepts#Selector|Selector]] to multiplex over | The server code uses a [[Java_Non-Blocking_I/O_Concepts#Selector|Selector]] to multiplex over selectable channels: [[Java_Non-Blocking_I/O_Concepts#Selectable_Channel|selectable channels]]: a [[Java_Non-Blocking_I/O_Concepts#ServerSocketChannel|ServerSocketChannel]] that listens for incoming network connections and creates new [[Java_Non-Blocking_I/O_Concepts#SocketChannel|SocketChannels]] for each new TCP connection, and subsequently registered SocketChannels. | ||
<syntaxhighlight lang='java'> | |||
// | |||
// Main selector multiplexor. We use the main thread as selector thread. | |||
// | |||
Selector selector = Selector.open(); | |||
// | |||
// The ServerSocketChannel used to accept new TCP connections | |||
// | |||
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); | |||
serverSocketChannel.configureBlocking(false); | |||
InetSocketAddress address = new InetSocketAddress(PORT); | |||
ServerSocket ss = serverSocketChannel.socket(); | |||
ss.bind(address); | |||
// | |||
// Register the ServerSocketChannel with the selector | |||
// | |||
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); | |||
</syntaxhighlight> | |||
The main event loop handles two types of events: new connections and data availability on the existing connections. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. If data becomes available on any of the registered SocketChannels, we use a [[Java_Non-Blocking_I/O_Concepts#Buffer|Buffer]] to read it. | |||
<syntaxhighlight lang='java'> | |||
// | |||
// The main event loop | |||
// | |||
while(true) { | |||
// | |||
// This call blocks until at least one I/O event occurs | |||
// | |||
selector.select(); | |||
Set<SelectionKey> selectedKeys = selector.selectedKeys(); | |||
for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) { | |||
// | |||
// Figure out what kind of I/O event was selected | |||
// | |||
SelectionKey k = i.next(); | |||
if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { | |||
// | |||
// New connection | |||
// | |||
info("new TCP connection"); | |||
// | |||
// Remove the key from the set | |||
// | |||
i.remove(); | |||
// | |||
// Retrieve the SocketChannel for the new connection, make it non-blocking, and register | |||
// it with the same selector so now we can handle incoming data events on the same event | |||
// loop | |||
// | |||
ServerSocketChannel ssc = (ServerSocketChannel)k.channel(); | |||
SocketChannel sc = ssc.accept(); | |||
sc.configureBlocking(false); | |||
sc.register(selector, SelectionKey.OP_READ); | |||
} | |||
else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { | |||
// | |||
// New data available, read it | |||
// | |||
// | |||
// Remove the key from the set | |||
// | |||
i.remove(); | |||
SocketChannel sc = (SocketChannel)k.channel(); | |||
ByteBuffer buffer = ByteBuffer.allocate(1024); | |||
int bytesRead = sc.read(buffer); | |||
if (bytesRead == -1) { | |||
// | |||
// TCP connection was closed | |||
// | |||
info("TCP connection closed"); | |||
// | |||
// Unregister the channel, by canceling the key. If we don't do this, data availability | |||
// events for zero-length data will keep popping up. | |||
// | |||
k.cancel(); | |||
} | |||
else { | |||
// | |||
// Read data | |||
// | |||
buffer.flip(); | |||
byte[] content = new byte[bytesRead]; | |||
buffer.get(content, 0, bytesRead); | |||
info("received: " + new String(content)); | |||
} | |||
} | |||
} | |||
} | |||
</syntaxhighlight> | |||
==Client== | ==Client== | ||
[[ | The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server. | ||
<syntaxhighlight lang='java'> | |||
// | |||
// Use a selector to receive data asynchronously | |||
// | |||
final Selector selector = Selector.open(); | |||
// | |||
// Create the SocketChannel | |||
// | |||
SocketChannel socketChannel = SocketChannel.open(); | |||
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT)); | |||
socketChannel.configureBlocking(false); | |||
c.info("socket connected"); | |||
// | |||
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data | |||
// | |||
socketChannel.register(selector, SelectionKey.OP_READ); | |||
// | |||
// Create a selector thread that will be notified on asynchronous data arrival | |||
// | |||
new Thread(() -> { | |||
while(true) { | |||
try { | |||
selector.select(); | |||
Set<SelectionKey> selectedKeys = selector.selectedKeys(); | |||
for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) { | |||
SelectionKey k = i.next(); | |||
if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { | |||
// | |||
// New data available, read it | |||
// | |||
// | |||
// Remove the key from the set | |||
// | |||
i.remove(); | |||
SocketChannel sc = (SocketChannel)k.channel(); | |||
ByteBuffer buffer = ByteBuffer.allocate(1024); | |||
int bytesRead = sc.read(buffer); | |||
if (bytesRead == -1) { | |||
// | |||
// TCP connection was closed | |||
// | |||
c.info("TCP connection closed"); | |||
// | |||
// Unregister the channel, by canceling the key. If we don't do this, data availability | |||
// events for zero-length data will keep popping up. | |||
// | |||
k.cancel(); | |||
} | |||
else { | |||
// | |||
// Read data | |||
// | |||
buffer.flip(); | |||
byte[] content = new byte[bytesRead]; | |||
buffer.get(content, 0, bytesRead); | |||
c.info("received : " + new String(content)); | |||
} | |||
} | |||
} | |||
} | |||
catch(IOException e) { | |||
e.printStackTrace(); | |||
} | |||
} | |||
}, "Selector Thread").start(); | |||
// | |||
// Enter the command line loop, this is where we write data on the socket | |||
// | |||
while(true) { | |||
String line = c.readLine(); | |||
if (line.startsWith("exit")) { | |||
break; | |||
} | |||
// | |||
// Send data | |||
// | |||
ByteBuffer buffer = ByteBuffer.allocate(1024); | |||
buffer.clear(); | |||
buffer.put(line.getBytes()); | |||
buffer.flip(); | |||
while(buffer.hasRemaining()) { | |||
socketChannel.write(buffer); | |||
} | |||
} | |||
socketChannel.close(); | |||
c.info("socket channel closed"); | |||
</syntaxhighlight> |
Latest revision as of 22:34, 25 July 2018
Internal
Overview
This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use Java NIO APIs primitives introduced in Java 4.
Example
Programming Model
Server
The server code uses a Selector to multiplex over selectable channels: selectable channels: a ServerSocketChannel that listens for incoming network connections and creates new SocketChannels for each new TCP connection, and subsequently registered SocketChannels.
//
// Main selector multiplexor. We use the main thread as selector thread.
//
Selector selector = Selector.open();
//
// The ServerSocketChannel used to accept new TCP connections
//
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(PORT);
ServerSocket ss = serverSocketChannel.socket();
ss.bind(address);
//
// Register the ServerSocketChannel with the selector
//
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
The main event loop handles two types of events: new connections and data availability on the existing connections. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. If data becomes available on any of the registered SocketChannels, we use a Buffer to read it.
//
// The main event loop
//
while(true) {
//
// This call blocks until at least one I/O event occurs
//
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
//
// Figure out what kind of I/O event was selected
//
SelectionKey k = i.next();
if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
//
// New connection
//
info("new TCP connection");
//
// Remove the key from the set
//
i.remove();
//
// Retrieve the SocketChannel for the new connection, make it non-blocking, and register
// it with the same selector so now we can handle incoming data events on the same event
// loop
//
ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//
// New data available, read it
//
//
// Remove the key from the set
//
i.remove();
SocketChannel sc = (SocketChannel)k.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buffer);
if (bytesRead == -1) {
//
// TCP connection was closed
//
info("TCP connection closed");
//
// Unregister the channel, by canceling the key. If we don't do this, data availability
// events for zero-length data will keep popping up.
//
k.cancel();
}
else {
//
// Read data
//
buffer.flip();
byte[] content = new byte[bytesRead];
buffer.get(content, 0, bytesRead);
info("received: " + new String(content));
}
}
}
}
Client
The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server.
//
// Use a selector to receive data asynchronously
//
final Selector selector = Selector.open();
//
// Create the SocketChannel
//
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT));
socketChannel.configureBlocking(false);
c.info("socket connected");
//
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data
//
socketChannel.register(selector, SelectionKey.OP_READ);
//
// Create a selector thread that will be notified on asynchronous data arrival
//
new Thread(() -> {
while(true) {
try {
selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
SelectionKey k = i.next();
if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//
// New data available, read it
//
//
// Remove the key from the set
//
i.remove();
SocketChannel sc = (SocketChannel)k.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = sc.read(buffer);
if (bytesRead == -1) {
//
// TCP connection was closed
//
c.info("TCP connection closed");
//
// Unregister the channel, by canceling the key. If we don't do this, data availability
// events for zero-length data will keep popping up.
//
k.cancel();
}
else {
//
// Read data
//
buffer.flip();
byte[] content = new byte[bytesRead];
buffer.get(content, 0, bytesRead);
c.info("received : " + new String(content));
}
}
}
}
catch(IOException e) {
e.printStackTrace();
}
}
}, "Selector Thread").start();
//
// Enter the command line loop, this is where we write data on the socket
//
while(true) {
String line = c.readLine();
if (line.startsWith("exit")) {
break;
}
//
// Send data
//
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
buffer.put(line.getBytes());
buffer.flip();
while(buffer.hasRemaining()) {
socketChannel.write(buffer);
}
}
socketChannel.close();
c.info("socket channel closed");