Friday, April 8, 2011

Java NIO for Writing

Using Java NIO non-blocking IO for writing as well as reading is almost - but not quite - straightforward.

Contents

Background

One of the limitations pointed out in the Limitations section of the original post in this series was that we were still directly writing our output data to the socket rather than using non-blocking IO and continuations as we were doing when reading our input data. If a client stops reading its input (or if there is sufficient network congestion that it looks that way from our end) then our socket output buffer may fill up. If that happens, then one of two things will happen when we try to write our data to that socket: either the call will block, or the data will not all be written. If the call blocks, then we have a blocked thread that we can not use for processing other clients until it is unblocked. If there are many clients who are not reading their input, we could have many blocked threads. Since one of the goals of this exercise is to be able to run many clients on a relatively small number of threads, having blocked threads is bad. To avoid this problem, we use non-blocking output and continuations for writing to the output, just as we did for reading the input.

The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-write.

Implementation

We model the output code on the input code by making these changes:
  • We write a suspending write method that registers our interest in writing to the output socket connection.
  • We add an output queue to receive data from the application.
  • We modify the writeLine method to add a line to the output queue rather than writing directly to the output socket.
  • We run a separate control loop that reads from the output queue and writes to the output socket.

//In class NioConnection
    private val outQ = new CoQueue[String](sched, 10)

    def start():Unit = {
        startReader
        startWriter
        startApp
    }

    private def startWriter() {
        reset {
            while (socket.isOpen)
                writeWait
        }
    }

    private def write(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            selector.register(socket, SelectionKey.OP_WRITE, {
                val n = socket.write(b)
                k(n)
            })
        }
    }

    private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
        write(b)
        if (b.remaining>0 && socket.isOpen)
            writeBuffer(b)
        else
            shiftUnit[Unit,Unit,Unit]()
    }

    private def writeWait:Unit @suspendable = {
        val str = outQ.blockingDequeue
        writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
    }

    def writeLine(s:String):Unit @suspendable = write(s+"\n")
    def write(s:String):Unit @suspendable = outQ.blockingEnqueue(s)
This seems pretty straightforward, but unfortunately it doesn't work. The problem is that we have attempted to register our channel twice (once for read and once for write) with the same selector. The documentation for SelectableChannel says, "A channel may be registered at most once with any particular selector." If we call register for our channel for write when it is already registered for read, the read registration is overwritten by the write registration and is lost.

In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." We could take this approach, adding some code to combine the read and write interest flags when we are in that position, but unlike in James' case we would also need to add some code to demultiplex the separate callbacks for read and write. Instead, we use a different approach: we use separate selectors for reading and writing, and we give each of them its own thread.

Two Selectors

Depending on the implementation, using two selectors and two threads this way could cause problems. However, based on my understanding of the documentation, the code in the Sun implementation and the operation of the POSIX select operation, I believe this approach should work (at least on POSIX systems). This would need to be tested on all supported platforms for a production system.

To use separate read and write selectors, we replace the current selector parameter in NioConnection with two parameters readSelector and writeSelector of the same type.
//In object NioConnection:
    def newConnection(sched:CoScheduler, readSelector:NioSelector,
            writeSelector:NioSelector, socket:SocketChannel) {
        val conn = new NioConnection(sched,readSelector,
            writeSelector,socket)
        conn.start()
    }

class NioConnection(sched:CoScheduler, readSelector:NioSelector, 
        writeSelector:NioSelector, socket:SocketChannel) {
    ...
    private def read(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            readSelector.register(socket, SelectionKey.OP_READ, {
                val n = socket.read(b)
                k(n)
            })
        }
    }

    private def write(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            writeSelector.register(socket, SelectionKey.OP_WRITE, {
                val n = socket.write(b)
                k(n)
            })
        }
    }

    ...
}
We also change NioListener to pass through those two arguments, and we choose to use the readSelector to handle our accept calls.
//In NioListener
class NioListener(sched:CoScheduler, readSelector:NioSelector,
        writeSelector:NioSelector, hostAddr:InetAddress, port:Int) {
    ...
    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(sched,
                    readSelector,writeSelector,socket)
            }
        }
    }

    private def accept():SocketChannel @suspendable = {
        shift { k =>
            readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, {
                val conn = serverChannel.accept()
                conn.configureBlocking(false)
                k(conn)
            })
        }
    }
}
Finally, we instantiate the new write selector in NioServer, pass it in to NioListener, and start it running in a new thread.
//In NioServer
class NioServer(hostAddr:InetAddress, port:Int) {
    val readSelector = new NioSelector()
    val writeSelector = new NioSelector()
    val sched = new DefaultCoScheduler
    val listener = new NioListener(sched, 
        readSelector, writeSelector, hostAddr, port)

    def start() {
        listener.start(true)
        //run the NIO read and write selectors each on its own thread
        (new Thread(writeSelector,"WriteSelector")).start
        (new Thread(readSelector,"ReadSelector")).start
        Thread.currentThread.setName("CoScheduler")
        sched.run    //run the coroutine scheduler on our thread, renamed
    }
}

Close

Our current example has no terminating condition, so never attempts to close the connection. Looking ahead, we expect to have applications that will want to do that, so we add a close method to NioConnection, and an isOpen method that allows us to see when it is closed.

We can't just add a close method that directly closes the socket, because there may still be output data waiting to be written. Thus we need an implementation that somehow waits until all of the queued output data has been written to the output before closing the socket.

One easy way to do this is to have a special marker string that we put into the output queue when the application requests to close the socket. When our socket output code sees that marker, we know it has already written out all of the data that came before that marker in the output queue, so we can close the socket. By doing the socket close in the same method that does the writes to the socket, and by ensuring that that method is called on the (write) selection thread, we also ensure that the close happens on the selection thread.

The compiler shares constant strings, so to make sure we have a unique string for our marker that can't be passed in by any code outside of our close method, we use new String(). In writeWait, where we check for that marker, we use the identity comparison eq when checking for the marker, and we add a call to shiftUnit to make both sides of the if statement be CPS.

A call to our close method will return right away, but the socket will not get closed until after all of the data in the output queue has been written to the output socket. The application can tell when the socket has actually been closed by calling the isOpen method.
//In NioConnection
    private def writeWait():Unit @suspendable = {
        val str = outQ.blockingDequeue
        if (str eq closeMarker) {
            socket.close
            shiftUnit[Unit,Unit,Unit]()
        } else
            writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
    }

    def isOpen = socket.isOpen
    private val closeMarker = new String("")
    def close():Unit @suspendable = write(closeMarker)

Summary

As in the previous two posts, we have modified the program to make an internal improvement that has not changed its basic external behavior. We have, however, changed its behavior for one of the corner cases - in this case what happens when an output socket fills up, such as might happen when there is excessive network latency - which is a necessary improvement for a production application, particularly if one expects the kind of high volume that would make those corner cases more likely.

No comments: