Tuesday, March 22, 2011

Java NIO and Scala Continuations

Combining Java's NIO non-blocking input/output with Scala's delimited continuations allows making a multi-client stateful server that does not require a a dedicated thread for every active client.

Contents

Motivation

When writing a high-volume server, you want it to be able to handle a large number of clients in a timely manner. You want the server to be able to respond quickly to each request even when there are a lot of clients. In particular, when you have a large number of clients for whom you are storing state but are not at this moment making a request, you want those clients to have a negligible effect on your response time, and when you have other clients currently making requests that may take some time to execute, you don't want those requests significantly increasing your response time to other requests.

From the above, we thus have these requirements:
  • The size of the state stored for each active client should be minimized.
  • The time to notice that a client has made a request should not depend significantly on the number of active clients not making a request.
  • A client request should never wait on a request from another client.
A simple Java implementation of a server might store its per-client state in a thread for each active client, and block waiting for a request on each thread. Java's thread scheduler then takes care of noticing when input is is available from one of those blocking reads and continuing execution on the associated thread. However, threads are relatively expensive and consume more memory than necessary.

An implementation that stores the state for each client manually could store that state much more compactly and thus handle many more clients, but in addition to being more complicated to implement, the application must now include a mechanism to multiplex multiple clients onto fewer threads than there are active clients without having any clients blocked by other clients.

Scala's delimited continuations give us a another option for storing the state of a client that is simpler to implement than manual state storage and that uses far less memory than using a thread for each client (see slide 19 of Phillip Haller's presentation on Actors, where he says an app can handle up to 5,000 threads, but up to 1,200,000 actors using closures).

That leaves us to solve the issue of how to multiplex many clients without having them block each other. If using the standard java.io package, the only option for reading from multiple streams without blocking is to poll using the available method. Besides the wasted CPU time, as the number of active clients grows the polling loop takes longer to run, meaning additional latency before the application notices that any particular stream is ready for reading and thus increased response time. The Java NIO package provides an efficient solution for this problem.

In the "Nio*" sections below I present a few Scala classes that break up the NIO work into chunks that I hope are easy to understand. The last piece is a simple main class to test it all.

Java Nonblocking IO

Starting with version 1.4, Java has a package called NIO (new input/output) that includes the ability to do non-blocking input and output.

The non-blocking IO model in Java NIO is based on Channels and Selectors, similar to file descriptors and the select call in POSIX operating systems. A Channel represents a connection to a file, socket, or some other data source or sink. A Selector allows collecting a set of Channels together and waiting until any of them are ready for input or output.

In order to use a Channel with a Selector, you must register the Channel with the Selector (and the Channel must be a SelectableChannel). When you register a channel you also specify which operations are of interest, such as READ or WRITE. These operations are represented by bits that can be combined by ORing them together to create a value that indicates interest in multiple operations.

Registering a channel with a selector creates a SelectionKey. When you later execute the select method on the selector, it returns a set of SelectionKeys, from which you can extract the information passed to the register call when the SelectionKey was created.

NioSelector

The register call to java.nio.channels.SelectableChannel includes an "attachment" parameter that allows the application to attach an arbitrary object to that registration and retrieve it later when the associated channel becomes selected. In our server, we attach a callback function to be executed when the channel becomes selected. We package this up in our own register method in our NioSelector class.

import java.nio.channels.SelectableChannel
import java.nio.channels.spi.SelectorProvider

/** Handle nio channel selection. */
class NioSelector {

    val selector = SelectorProvider.provider.openSelector()

    def register(channel:SelectableChannel, op:Int, body: => Unit) {
        val callback:Function0[Unit] = { () => { body }}
        channel.register(selector, op, callback)
    }
}
We add a selectOnce method that waits for one or more of the registered channels to be selected, or until the specified time has elapsed if no channels are selected during that time. The underlying call to selectedKeys in java.nio.channels.Selector returns to us a java.util.Set containing the information about which channels are selected. Before we process each channel, we need to clear the current state so that we are able to re-register interest in that channel during that processing. We make a copy of the state for all selected channels and then clear that state before processing any channels.
import scala.collection.JavaConversions
import java.nio.channels.SelectionKey

//In class NioSelector

    def selectOnce(timeout:Long) {
        selector.select(timeout)
        val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys
        val keys = JavaConversions.asScalaSet(jKeys).toList
        selector.selectedKeys.clear()
        keys foreach { _.interestOps(0) }
        val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] }
        executeCallbacks(callbacks) //Execute callbacks for all selected keys
    }
The executeCallbacks method is responsible for executing the callbacks for the selected channels. If we assume that at least some of these callbacks may take a relatively long time to complete, then in a real server we might want to throw all of these callbacks into a thread pool or farm them out to actors so that we can avoid having one long-running callback block out the rest of the channels. For now, though, we will ignore that issue and use the simple implementation of just executing each callback sequentially in the same thread.
//In class NioSelector

    def executeCallbacks(callbacks:List[()=>Unit]) {
        callbacks foreach { _() }
    }
For convenience, we add a selectLoop method that will continue to call selectOnce until we are ready to stop processing, and we make our class implement the Runnable interface.
//In NioSelector class

    def run() {
        selectLoop(true)
    }

    def selectLoop(continueProcessing: => Boolean) {
        while (continueProcessing) {
            selectOnce(0)
        }
    }
Here is the complete NioSelector class:
import scala.collection.JavaConversions
import java.nio.channels.SelectableChannel
import java.nio.channels.SelectionKey
import java.nio.channels.spi.SelectorProvider

/** Handle nio channel selection. */
class NioSelector extends Runnable {

    val selector = SelectorProvider.provider.openSelector()

    def register(channel:SelectableChannel, op:Int, body: => Unit) {
        val callback:Function0[Unit] = { () => { body }}
        channel.register(selector, op, callback)
    }

    def run() {
        selectLoop(true)
    }

    def selectLoop(continueProcessing: => Boolean) {
        while (continueProcessing) {
            selectOnce(0)
        }
    }

    def selectOnce(timeout:Long) {
        selector.select(timeout)
        val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys
        val keys = JavaConversions.asScalaSet(jKeys).toList
        selector.selectedKeys.clear()
        keys foreach { _.interestOps(0) }
        val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] }
        executeCallbacks(callbacks) //Execute callbacks for all selected keys
    }

    def executeCallbacks(callbacks:List[()=>Unit]) {
        callbacks foreach { _() }
    }
}
Our main method will create an instance of NioSelector, do some other initialization, and eventually call NioSelector.run.

NioListener

The above NioSelector class gives us our main event loop, but how do we get things hooked up to start generating events? The basic socket flow on the server side for reading data follows these steps:
  • Listen for new connections on a socket.
  • When a new connection is made, initialize that connection and wait for input data on that connection.
  • When input data is received, process that input data.
  • Eventually, close the connection.
Our NioListener class takes care of creating the socket on which we listen, accepting new connections, and passing them off to another class for per-connection initialization.

We start with the code that creates the listener socket. Since we will be passing this socket to our selector, we must create a ServerSocketChannel rather than a regular socket. We don't want to block waiting for this socket, so we call configureBlocking(false).
import java.net.{InetAddress,InetSocketAddress}
import java.nio.channels.ServerSocketChannel

class NioListener(selector:NioSelector, hostAddr:InetAddress, port:Int) {

    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false);
    val isa = new InetSocketAddress(hostAddr,port)
    serverChannel.socket.bind(isa)
}
Now that we have created our socket, we must register it with our selector. In that call, we will tell the selector that we are interested in the ACCEPT operation on our socket channel. The third argument to our register method is the callback; this is where continuations come in.

Blocking code is generally simpler and thus easier to write and understand than asynchronous (non-blocking) code. By using continuations we can capture the current state of the code, explicitly save it for later execution, and continue running the thread at a different point. The code within our delimited continuation is blocked, but the thread does not get blocked.

Rather than explicitly creating and passing in a callback with application state, we arrange things such that the callback passed to our register method includes a call to our continuation, so that when that callback is called, it executes our continuation and makes our code continue from where we left off.

We implement our own accept method that looks to the calling code like the blocking ServerSocket.accept method, but that uses continuations as described above to do the blocking. We then call our accept method from within a reset block that delimits the continuation and determines how much code is saved in the continuation (everything within the reset) and the point at which execution continues (just past the reset) in the thread when the accept call blocks.
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

//In NioListener class
    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(selector,socket)
            }
        }
    }

    private def accept():SocketChannel @suspendable = {
        shift { k =>
            selector.register(serverChannel,SelectionKey.OP_ACCEPT, {
                val conn = serverChannel.accept()
                conn.configureBlocking(false)
                k(conn)
            })
        }
    }
When we call our start method, it enters the reset block and calls accept, which registers our interest in the ACCEPT operation, saves the continuation as part of our callback in the register method and immediately returns control to the end of the reset block, thus the start method returns quickly. When a new connection arrives, the select code in NioSelector calls our callback from its executeCallbacks method. Our callback accepts the new connection and sets it to non-blocking, then executes our continuation, which will "return" from our blocked accept method into the loop in our start method. The code processes the new connection, then loops and calls accept again, which reregisters interest in an ACCEPT operation on our socket and returns execution to the caller of the continuation, which returns control to the executeCallbacks method in NioSelector.

NioConnection

Our NioConnection class is responsible for creating our connection object, registering it with the selector, reading input data when available and processing that data. The processing that we do in this example is to echo the data back to the client. In order to keep this example simple, we are not using NIO and continuations on the writing side, rather we use a simple write call to write our data directly to the socket.

As with NioListener, we create our own read call that uses continuations to block, but otherwise looks similar to the standard Java InputStream.read(byte[]) call, except that we read into a ByteBuffer rather than a byte array. As with NioListener, our start method that calls our read method (via our intermediate method readWait) contains a reset block that delimits the continuation.
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

object NioConnection {
    def newConnection(selector:NioSelector, socket:SocketChannel) {
        val conn = new NioConnection(selector,socket)
        conn.start()
    }
}

class NioConnection(selector:NioSelector, socket:SocketChannel) {

    private val buffer = ByteBuffer.allocateDirect(2000)

    def start():Unit = {
        reset {
            while (socket.isOpen)
                readWait
        }
    }

    private def readWait = {
        buffer.clear()
        val count = read(buffer)
        if (count<1)
            socket.close()
        else
            readAction(buffer)
    }

    private def read(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            selector.register(socket, SelectionKey.OP_READ, {
                val n = socket.read(b)
                k(n)
            })
        }
    }

    private def readAction(b:ByteBuffer) {
        b.flip()
        socket.write(b)
        b.clear()
    }
}
The program flow is similar to the flow in NioListener When we call our start method, it enters the reset block and (through the call to readWait) calls read, which registers our interest in the READ operation, saves the continuation as part of our callback in the register method and immediately returns control to the end of the reset block, thus the start method returns quickly. When data is available to be read from the socket, the select code in NioSelector calls our callback from its executeCallbacks method. Our callback reads the data from the connection then executes our continuation, which will "return" from our blocked read method into the readWait method. The code processes the input data in readAction (which just echos the data back to the client), then (if the socket is still open) loops and ends up calling read again, which reregisters interest in a READ operation on our socket and returns execution to the caller of the continuation, which returns control to the executeCallbacks method in NioSelector.

NioServer

Here is a short main program to test the above modules.
import java.net.InetAddress

object NioServer {
    def main(args:Array[String]) {
        val hostAddr:InetAddress = null //listen on local connection
        val port = 1234
        val server = new NioServer(hostAddr,port)
        server.start()
    }
}

class NioServer(hostAddr:InetAddress, port:Int) {
    val selector = new NioSelector()
    val listener = new NioListener(selector, hostAddr, port)

    def start() {
        listener.start(true)
        selector.run()
    }
}
Create a directory and place into that directory the above four files (NioSelector.scala, NioListener.scala, NioConnection.scala and NioServer.scala), cd into that directory, then execute the following commands to compile the files and execute the test program:
scalac -P:continuations:enable *.scala
scala NioServer
If you get an "Address already in use" error and traceback, edit NioServer.scala and pick another port number.

After starting the server, connect to it using telnet:
telnet localhost 1234
In the telnet client, each time you enter a line of text the server should echo that line of text back to you.

Limitations

The above example demonstrates a very basic echo server in a bit over 100 lines of code that uses NIO and continuations for reading data, but it has some limitations:
  • The processing of the input data is buried in the code that reads the data. A well-factored architecture would allow that processing code to be separately maintained.
  • We are using one thread for all execution. In the above example we are just echoing clients' input back to them, but if we want to do some heavier processing we will need to move that work off of the select thread onto one or more worker threads so as not to block everyone while handling a request for one client.
  • Communication between the select thread and the worker threads will require some kind of buffering, which introduces more points at which execution can be blocked, thus more opportunities to use continuations.
  • If we wish to treat the data as characters rather than bytes then we need to do that conversion.
  • We are not using IO and continuations when writing data to the client, so if for some reason our socket is not ready for writing the application will not work properly (either it will block waiting for output, or we will lose some output data).
  • We are not storing any significant amount of state in our continuations. If all we are doing is a simple echo server, we don't really need continuations. A more sophisticated example would better illustrate the convenience of using continuations to store program state.
I hope to address these points in future posts.

Resources

  • My explanation of delimited continuations.
  • Wikipedia article on Java NIO.
  • Rox Java NIO Tutorial, in which James Greenfield builds a server in Java using NIO and gives a number of tips and warnings.
  • Jenkov's Java NIO Tutorial goes over each part of the entire NIO package in detail.
  • MINA, Apache's network application framework that uses NIO.
  • Loft, a single-threaded web server in Scala using NIO and continuations (under development).
  • A post from July 2009 on the Scala mailing list with an (uncompiled) example of a simple single-threaded server using NIO. The post demonstrates the approach of creating versions of the standard calls (accept, read, write) that block using continuations.

7 comments:

Anonymous said...

Very interesting stuff. I wonder how that could be used for MMO game server, for instance -- I've worked on an Erlang server for Worldforge some years ago..

steve said...

Nice examples of how things be solved in a high-level approach!

But how do you actually work with files, considering that real support for files and filesystems will only ship with Java 7?

Do you employ native libraries or some other trick?

monisa said...

we are planning to use continuation support in the scala language through shift and reset to track the users who are logged in(We are trying to use this instead of using session or cookie to track the users logged in). So please provide us any sample programe to implement the continuation support through shift and reset to track the logged users in scala.

Jim McBeath said...

Thierry: I plan on making a few more posts in which I continue to develop this server. Perhaps at a later point it will be easier to see how it could be used for that type of server.

steve: I have not yet had a need for non-blocking file I/O in Java, so I have not attempted a solution. I don't see a way to do this in Java 6 without using a thread.

monisa: I do not have any sample code that tracks users as you suggest, but I could do some contract work for you if you'd like. If you are creating a web app, I suggest you ask on the Lift mailing list to see if it can already do what you want.

Sandeep said...
This comment has been removed by a blog administrator.
Jim McBeath said...

Sandeep: When you post a comment with a link to your blog and no commentary about this blog post, I consider it spam, so I have removed it.

Edward Garson said...

Nice post