Java NIO and TCP Connections: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
 
(11 intermediate revisions by the same user not shown)
Line 6: Line 6:
=Overview=
=Overview=


This article describes the programming model involved in establishing a simple TCP connection and interacting with it with non-blocking I/O, from Java. We use [[Java_Non-Blocking_I/O_Concepts#Overview|Java NIO APIs]] [[Java_Non-Blocking_I/O_Concepts#Primitives|primitives]] introduced in Java 4.
This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use [[Java_Non-Blocking_I/O_Concepts#Overview|Java NIO APIs]] [[Java_Non-Blocking_I/O_Concepts#Primitives|primitives]] introduced in Java 4.
 
=Example=
 
{{External|[https://github.com/NovaOrdis/playground/tree/master/java/nio/tcp Playground Java NIO and TCP Connections]}}


=Programming Model=
=Programming Model=
[[Image:JavaNIOAndTCPConnections.png]]


==Server==
==Server==
Line 47: Line 53:
while(true) {
while(true) {


    //
  //
    // This call blocks until at least one I/O event occurs
  // This call blocks until at least one I/O event occurs
    //
  //
 
  selector.select();
 
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
 
  for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
 
        //
        // Figure out what kind of I/O event was selected
        //
 
        SelectionKey k = i.next();
 
        if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
 
          //
          // New connection
          //
 
          info("new TCP connection");
 
          //
          // Remove the key from the set
          //
 
          i.remove();


    selector.select();
          //
          // Retrieve the SocketChannel for the new connection, make it non-blocking, and register
          // it with the same selector so now we can handle incoming data events on the same event
          // loop
          //


    Set<SelectionKey> selectedKeys = selector.selectedKeys();
          ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
          SocketChannel sc = ssc.accept();
          sc.configureBlocking(false);
          sc.register(selector, SelectionKey.OP_READ);
      }
      else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {


    for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
            //
            // New data available, read it
            //


             //
             //
             // Figure out what kind of I/O event was selected
             // Remove the key from the set
             //
             //


             SelectionKey k = i.next();
             i.remove();
 
            SocketChannel sc = (SocketChannel)k.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int bytesRead = sc.read(buffer);
 
            if (bytesRead == -1) {
 
                //
                // TCP connection was closed
                //
 
                info("TCP connection closed");
 
                //
                // Unregister the channel, by canceling the key. If we don't do this, data availability
                // events for zero-length data will keep popping up.
                //
 
                k.cancel();
 
            }
            else {
 
                  //
                  // Read data
                  //
 
                  buffer.flip();
                  byte[] content = new byte[bytesRead];
                  buffer.get(content, 0, bytesRead);
 
                  info("received: " + new String(content));
          }
      }
  }
}
</syntaxhighlight>
 
==Client==
 
The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server.
 
<syntaxhighlight lang='java'>
//
// Use a selector to receive data asynchronously
//
 
final Selector selector = Selector.open();
 
//
// Create the SocketChannel
//
 
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT));
socketChannel.configureBlocking(false);


            if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
c.info("socket connected");


                    //
//
                    // New connection
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data
                    //
//
 
socketChannel.register(selector, SelectionKey.OP_READ);
 
//
// Create a selector thread that will be notified on asynchronous data arrival
//
 
new Thread(() -> {


                    c.info(TIMESTAMP_FORMAT.format(new Date()) + ": new connection");
    while(true) {


                    //
        try {
                    // Remove the key from the set
                    //


                    i.remove();
            selector.select();


                    //
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    // Retrieve the SocketChannel for the new connection, make it non-blocking, and register
                    // it with the same selector so now we can handle incoming data events on the same event
                    // loop
                    //


                    ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
            for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    sc.register(selector, SelectionKey.OP_READ);


                    //
                SelectionKey k = i.next();
                    // TODO this is where we pass the channel to a console subsystem to send data back on it
                    //


                 }
                 if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {


                     //
                     //
Line 102: Line 197:
                     // Remove the key from the set
                     // Remove the key from the set
                     //
                     //
 
                     i.remove();
                     i.remove();


Line 115: Line 210:
                         //
                         //


                         c.info(TIMESTAMP_FORMAT.format(new Date()) + ": TCP connection closed");
                         c.info("TCP connection closed");


                         //
                         //
Line 135: Line 230:
                         buffer.get(content, 0, bytesRead);
                         buffer.get(content, 0, bytesRead);


                         c.info(TIMESTAMP_FORMAT.format(new Date()) + ": " + new String(content));
                         c.info("received : " + new String(content));
                     }
                     }
                 }
                 }
             }
             }
         }
         }
</syntaxhighlight>
        catch(IOException e) {
 
          e.printStackTrace();
           
      }
    }
}, "Selector Thread").start();
 
//
// Enter the command line loop, this is where we write data on the socket
//
 
while(true) {
 
    String line = c.readLine();
 
    if (line.startsWith("exit")) {
 
        break;
    }
 
    //
    // Send data
    //
 
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.clear();
    buffer.put(line.getBytes());
    buffer.flip();


==Client==
    while(buffer.hasRemaining()) {


[[Image:JavaNIOAndTCPConnections.png]]
        socketChannel.write(buffer);
    }
}


=Example=
socketChannel.close();


{{External|[https://github.com/NovaOrdis/playground/tree/master/java/nio/tcp Playground Java NIO and TCP Connections]}}
c.info("socket channel closed");
</syntaxhighlight>

Latest revision as of 22:34, 25 July 2018

Internal

Overview

This article describes the programming model involved in establishing a simple TCP connection using Java NIO API. The API provides non-blocking and block-oriented I/O access to the TCP connection. We use Java NIO APIs primitives introduced in Java 4.

Example

Playground Java NIO and TCP Connections

Programming Model

JavaNIOAndTCPConnections.png

Server

The server code uses a Selector to multiplex over selectable channels: selectable channels: a ServerSocketChannel that listens for incoming network connections and creates new SocketChannels for each new TCP connection, and subsequently registered SocketChannels.

//
// Main selector multiplexor. We use the main thread as selector thread.
//

Selector selector = Selector.open();

//
// The ServerSocketChannel used to accept new TCP connections
//

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
InetSocketAddress address = new InetSocketAddress(PORT);
ServerSocket ss = serverSocketChannel.socket();
ss.bind(address);

//
// Register the ServerSocketChannel with the selector
//

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

The main event loop handles two types of events: new connections and data availability on the existing connections. Once a new connection is detected, the selector thread retrieves the corresponding SocketChannel and registers it with the same selector. If data becomes available on any of the registered SocketChannels, we use a Buffer to read it.

//
// The main event loop
//

while(true) {

  //
  // This call blocks until at least one I/O event occurs
  //

  selector.select();

  Set<SelectionKey> selectedKeys = selector.selectedKeys();

  for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {

        //
        // Figure out what kind of I/O event was selected
        //

        SelectionKey k = i.next();

        if ((k.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {

           //
           // New connection
           //

           info("new TCP connection");

           //
           // Remove the key from the set
           //

           i.remove();

           //
           // Retrieve the SocketChannel for the new connection, make it non-blocking, and register
           // it with the same selector so now we can handle incoming data events on the same event
           // loop
           //

           ServerSocketChannel ssc = (ServerSocketChannel)k.channel();
           SocketChannel sc = ssc.accept();
           sc.configureBlocking(false);
           sc.register(selector, SelectionKey.OP_READ);
       }
       else if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

            //
            // New data available, read it
            //

            //
            // Remove the key from the set
            //

            i.remove();

            SocketChannel sc = (SocketChannel)k.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int bytesRead = sc.read(buffer);

            if (bytesRead == -1) {

                 //
                 // TCP connection was closed
                 //

                 info("TCP connection closed");

                 //
                 // Unregister the channel, by canceling the key. If we don't do this, data availability
                 // events for zero-length data will keep popping up.
                 //

                 k.cancel();

            }
            else {

                  //
                  // Read data
                  //

                  buffer.flip();
                  byte[] content = new byte[bytesRead];
                  buffer.get(content, 0, bytesRead);

                  info("received: " + new String(content));
          }
      }
   }
}

Client

The client creates the SocketChannel explicitly but it also registers it with a Selector, so it can asynchronously read the data that comes from the server.

//
// Use a selector to receive data asynchronously
//

final Selector selector = Selector.open();

//
// Create the SocketChannel
//

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", ServerMain.PORT));
socketChannel.configureBlocking(false);

c.info("socket connected");

//
// Register the ServerSocketChannel with the selector, so we can asynchronously receive data
//

socketChannel.register(selector, SelectionKey.OP_READ);

//
// Create a selector thread that will be notified on asynchronous data arrival
//

new Thread(() -> {

    while(true) {

        try {

            selector.select();

            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            for(Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext(); ) {

                SelectionKey k = i.next();

                if ((k.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

                    //
                    // New data available, read it
                    //

                    //
                    // Remove the key from the set
                    //
 
                    i.remove();

                    SocketChannel sc = (SocketChannel)k.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = sc.read(buffer);

                    if (bytesRead == -1) {

                        //
                        // TCP connection was closed
                        //

                        c.info("TCP connection closed");

                        //
                        // Unregister the channel, by canceling the key. If we don't do this, data availability
                        // events for zero-length data will keep popping up.
                        //

                        k.cancel();

                    }
                    else {

                        //
                        // Read data
                        //

                        buffer.flip();
                        byte[] content = new byte[bytesRead];
                        buffer.get(content, 0, bytesRead);

                        c.info("received : " + new String(content));
                    }
                }
            }
        }
        catch(IOException e) {

           e.printStackTrace();
            
       }
    }
}, "Selector Thread").start();

//
// Enter the command line loop, this is where we write data on the socket
//

while(true) {

    String line = c.readLine();

    if (line.startsWith("exit")) {

        break;
    }

    //
    // Send data
    //

    ByteBuffer buffer = ByteBuffer.allocate(1024);
    buffer.clear();
    buffer.put(line.getBytes());
    buffer.flip();

    while(buffer.hasRemaining()) {

        socketChannel.write(buffer);
    }
}

socketChannel.close();

c.info("socket channel closed");