Java NIO and TCP Connections: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
 
(14 intermediate revisions by the same user not shown)
Line 6: Line 6:
=Overview=
=Overview=


This article describes the programming model involved in establishing a simple TCP connection and interacting with it with non-blocking I/O, from Java. We use [[Java_Non-Blocking_I/O_Concepts#Overview|Java NIO APIs]] [[Java_Non-Blocking_I/O_Concepts#Primitives|primitives]] introduced in Java 4.
This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use [[Java_Non-Blocking_I/O_Concepts#Overview|Java NIO APIs]] [[Java_Non-Blocking_I/O_Concepts#Primitives|primitives]] introduced in Java 4.
 
=Example=
 
{{External|[https://github.com/NovaOrdis/playground/tree/master/java/nio/tcp Playground Java NIO and TCP Connections]}}


=Programming Model=
=Programming Model=
[[Image:JavaNIOAndTCPConnections.png]]


==Server==
==Server==


The server code uses a [[Java_Non-Blocking_I/O_Concepts#Selector|Selector]] to multiplex over two [[Java_Non-Blocking_I/O_Concepts#Selectable_Channel|selectable channels]]: a [[Java_Non-Blocking_I/O_Concepts#ServerSocketChannel|ServerSocketChannel]] that listens for incoming network connections and creates new [[Java_Non-Blocking_I/O_Concepts#SocketChannel|SocketChannels]] for each new TCP connection. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. The selector loop continues, and reacts to new connection attempts and new data coming over the existing SocketChannels.
The server code uses a [[Java_Non-Blocking_I/O_Concepts#Selector|Selector]] to multiplex over selectable channels: [[Java_Non-Blocking_I/O_Concepts#Selectable_Channel|selectable channels]]: a [[Java_Non-Blocking_I/O_Concepts#ServerSocketChannel|ServerSocketChannel]] that listens for incoming network connections and creates new [[Java_Non-Blocking_I/O_Concepts#SocketChannel|SocketChannels]] for each new TCP connection, and subsequently registered SocketChannels.
 
<syntaxhighlight lang='java'>
//
// Main selector multiplexor. We use the main thread as selector thread.
//
 
Selector selector = Selector.open();
 
//
// The ServerSocketChannel used to accept new TCP connections
//
 
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(PORT);
ServerSocket ss = serverSocketChannel.socket();
ss.bind(address);
 
//
// Register the ServerSocketChannel with the selector
//
 
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
</syntaxhighlight>
 
The main event loop handles two types of events: new connections and data availability on the existing connections. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. If data becomes available on any of the registered SocketChannels, we use a [[Java_Non-Blocking_I/O_Concepts#Buffer|Buffer]] to read it.
 
<syntaxhighlight lang='java'>
//
// The main event loop
//
 
while(true) {
 
  //
  // This call blocks until at least one I/O event occurs
  //
 
  selector.select();
 
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
 
  for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
 
        //
        // Figure out what kind of I/O event was selected
        //
 
        SelectionKey k = i.next();
 
        if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
 
          //
          // New connection
          //
 
          info("new TCP connection");
 
          //
          // Remove the key from the set
          //
 
          i.remove();
 
          //
          // Retrieve the SocketChannel for the new connection, make it non-blocking, and register
          // it with the same selector so now we can handle incoming data events on the same event
          // loop
          //
 
          ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
          SocketChannel sc = ssc.accept();
          sc.configureBlocking(false);
          sc.register(selector, SelectionKey.OP_READ);
      }
      else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
 
            //
            // New data available, read it
            //
 
            //
            // Remove the key from the set
            //
 
            i.remove();
 
            SocketChannel sc = (SocketChannel)k.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int bytesRead = sc.read(buffer);
 
            if (bytesRead == -1) {
 
                //
                // TCP connection was closed
                //
 
                info("TCP connection closed");
 
                //
                // Unregister the channel, by canceling the key. If we don't do this, data availability
                // events for zero-length data will keep popping up.
                //
 
                k.cancel();
 
            }
            else {
 
                  //
                  // Read data
                  //
 
                  buffer.flip();
                  byte[] content = new byte[bytesRead];
                  buffer.get(content, 0, bytesRead);
 
                  info("received: " + new String(content));
          }
      }
  }
}
</syntaxhighlight>


==Client==
==Client==


[[Image:JavaNIOAndTCPConnections.png]]
The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server.
 
<syntaxhighlight lang='java'>
//
// Use a selector to receive data asynchronously
//
 
final Selector selector = Selector.open();
 
//
// Create the SocketChannel
//
 
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT));
socketChannel.configureBlocking(false);
 
c.info("socket connected");
 
//
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data
//
 
socketChannel.register(selector, SelectionKey.OP_READ);
 
//
// Create a selector thread that will be notified on asynchronous data arrival
//
 
new Thread(() -> {
 
    while(true) {
 
        try {
 
            selector.select();
 
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
 
            for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
 
                SelectionKey k = i.next();
 
                if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
 
                    //
                    // New data available, read it
                    //
 
                    //
                    // Remove the key from the set
                    //
                    i.remove();
 
                    SocketChannel sc = (SocketChannel)k.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = sc.read(buffer);
 
                    if (bytesRead == -1) {
 
                        //
                        // TCP connection was closed
                        //
 
                        c.info("TCP connection closed");
 
                        //
                        // Unregister the channel, by canceling the key. If we don't do this, data availability
                        // events for zero-length data will keep popping up.
                        //
 
                        k.cancel();
 
                    }
                    else {
 
                        //
                        // Read data
                        //
 
                        buffer.flip();
                        byte[] content = new byte[bytesRead];
                        buffer.get(content, 0, bytesRead);
 
                        c.info("received : " + new String(content));
                    }
                }
            }
        }
        catch(IOException e) {
 
          e.printStackTrace();
           
      }
    }
}, "Selector Thread").start();
 
//
// Enter the command line loop, this is where we write data on the socket
//
 
while(true) {
 
    String line = c.readLine();
 
    if (line.startsWith("exit")) {
 
        break;
    }
 
    //
    // Send data
    //
 
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.clear();
    buffer.put(line.getBytes());
    buffer.flip();
 
    while(buffer.hasRemaining()) {
 
        socketChannel.write(buffer);
    }
}


=Example=
socketChannel.close();


{{External|[https://github.com/NovaOrdis/playground/tree/master/java/nio/tcp Playground Java NIO and TCP Connections]}}
c.info("socket channel closed");
</syntaxhighlight>

Latest revision as of 22:34, 25 July 2018

Internal

Overview

This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use Java NIO APIs primitives introduced in Java 4.

Example

Playground Java NIO and TCP Connections

Programming Model

JavaNIOAndTCPConnections.png

Server

The server code uses a Selector to multiplex over selectable channels: selectable channels: a ServerSocketChannel that listens for incoming network connections and creates new SocketChannels for each new TCP connection, and subsequently registered SocketChannels.

//
// Main selector multiplexor. We use the main thread as selector thread.
//

Selector selector = Selector.open();

//
// The ServerSocketChannel used to accept new TCP connections
//

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(PORT);
ServerSocket ss = serverSocketChannel.socket();
ss.bind(address);

//
// Register the ServerSocketChannel with the selector
//

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

The main event loop handles two types of events: new connections and data availability on the existing connections. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. If data becomes available on any of the registered SocketChannels, we use a Buffer to read it.

//
// The main event loop
//

while(true) {

  //
  // This call blocks until at least one I/O event occurs
  //

  selector.select();

  Set<SelectionKey> selectedKeys = selector.selectedKeys();

  for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {

        //
        // Figure out what kind of I/O event was selected
        //

        SelectionKey k = i.next();

        if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {

           //
           // New connection
           //

           info("new TCP connection");

           //
           // Remove the key from the set
           //

           i.remove();

           //
           // Retrieve the SocketChannel for the new connection, make it non-blocking, and register
           // it with the same selector so now we can handle incoming data events on the same event
           // loop
           //

           ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
           SocketChannel sc = ssc.accept();
           sc.configureBlocking(false);
           sc.register(selector, SelectionKey.OP_READ);
       }
       else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

            //
            // New data available, read it
            //

            //
            // Remove the key from the set
            //

            i.remove();

            SocketChannel sc = (SocketChannel)k.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int bytesRead = sc.read(buffer);

            if (bytesRead == -1) {

                 //
                 // TCP connection was closed
                 //

                 info("TCP connection closed");

                 //
                 // Unregister the channel, by canceling the key. If we don't do this, data availability
                 // events for zero-length data will keep popping up.
                 //

                 k.cancel();

            }
            else {

                  //
                  // Read data
                  //

                  buffer.flip();
                  byte[] content = new byte[bytesRead];
                  buffer.get(content, 0, bytesRead);

                  info("received: " + new String(content));
          }
      }
   }
}

Client

The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server.

//
// Use a selector to receive data asynchronously
//

final Selector selector = Selector.open();

//
// Create the SocketChannel
//

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT));
socketChannel.configureBlocking(false);

c.info("socket connected");

//
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data
//

socketChannel.register(selector, SelectionKey.OP_READ);

//
// Create a selector thread that will be notified on asynchronous data arrival
//

new Thread(() -> {

    while(true) {

        try {

            selector.select();

            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {

                SelectionKey k = i.next();

                if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

                    //
                    // New data available, read it
                    //

                    //
                    // Remove the key from the set
                    //
 
                    i.remove();

                    SocketChannel sc = (SocketChannel)k.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = sc.read(buffer);

                    if (bytesRead == -1) {

                        //
                        // TCP connection was closed
                        //

                        c.info("TCP connection closed");

                        //
                        // Unregister the channel, by canceling the key. If we don't do this, data availability
                        // events for zero-length data will keep popping up.
                        //

                        k.cancel();

                    }
                    else {

                        //
                        // Read data
                        //

                        buffer.flip();
                        byte[] content = new byte[bytesRead];
                        buffer.get(content, 0, bytesRead);

                        c.info("received : " + new String(content));
                    }
                }
            }
        }
        catch(IOException e) {

           e.printStackTrace();
            
       }
    }
}, "Selector Thread").start();

//
// Enter the command line loop, this is where we write data on the socket
//

while(true) {

    String line = c.readLine();

    if (line.startsWith("exit")) {

        break;
    }

    //
    // Send data
    //

    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.clear();
    buffer.put(line.getBytes());
    buffer.flip();

    while(buffer.hasRemaining()) {

        socketChannel.write(buffer);
    }
}

socketChannel.close();

c.info("socket channel closed");