Monday, March 28, 2011

Java NIO for Character Decoding in Scala

The Java NIO package includes some handy character encoding and decoding methods that can be used from Scala.

Contents

Background

In my previous post I described a simple Scala server using NIO and continuations, and mentioned in the Limitations section that the example did not convert the data bytes to characters. In this post I show how that can easily be added by using another feature of the Java NIO package: character-set encoders and decoders.

Java NIO Character Coders

The java.nio.charset package includes a Charset class that represents a mapping between the 16-bit Unicode code-units that Java uses for its internal representation for characters and strings, and a sequence of bytes as are stored in a file or transmitted through a socket connection. Each such mapping is represented by a separate instance of the Charset class. Standard character mappings such as "UTF-8" and "ISO-8859-1" can be retrieved using the static forName method.

Given an instance of Charset, a CharsetEncoder for that character mapping can be retrieved by calling the newEncoder method on that instance. That encoder can then be used to convert a Java string into a sequence of bytes suitable for writing to a file or connection.

Similarly, the newDecoder method on Charset retrieves a CharsetDecoder that can be used for the complementary task of converting bytes from a file or connection into a Java string.

The encoding and decoding methods convert data between a CharBuffer and a ByteBuffer. Since the java.nio socket I/O calls we are using read and write their data to and from ByteBuffers, it is convenient for the encoding and decoding to use those objects.

LineDecoder

Using the java.nio.charset classes described above, we write a LineDecoder class containing a processBytes method that takes as input a ByteBuffer (which is what we have to read into when using a SocketChannel) and converts that byte data to Java characters. For this example, we also break up that character data into separate lines when we see line break characters, converting each line of characters to a Java String. One buffer of data might contain multiple lines of character data, so rather than returning a set of lines, our method accepts a callback to which we pass each line as we decode it.

import java.nio.{ByteBuffer,CharBuffer}
import java.nio.charset.{Charset,CharsetDecoder,CharsetEncoder,CoderResult}
import scala.annotation.tailrec

class LineDecoder {

    //Encoders and decoders are not multi-thread safe, so create one
    //for each connection in case we are using multiple threads.
    val utf8Charset = Charset.forName("UTF-8")
    val utf8Encoder = utf8Charset.newEncoder
    val utf8Decoder = utf8Charset.newDecoder

    def processBytes(b:ByteBuffer, lineHandler:(String)=>Unit):Unit =
        processChars(utf8Decoder.decode(b),lineHandler)

    @tailrec
    private def processChars(cb:CharBuffer, lineHandler:(String)=>Unit) {
        val len = lengthOfFirstLine(cb)
        if (len>=0) {
            val ca = new Array[Char](len)
            cb.get(ca,0,len)
            eatLineEnding(cb)
            val line = new String(ca)
            lineHandler(line)
            processChars(cb, lineHandler)       //handle multiple lines
        }
    }

    //Assuming the first character in the buffer is an eol char,
    //consume it and a possible matching CR or LF in case the EOL is 2 chars.
    private def eatLineEnding(cb:CharBuffer) {
        //Eat the first character and see what it is
        cb.get match {
            case '\n' => if (cb.remaining>0 && cb.charAt(0)=='\r') cb.get
            case '\r' => if (cb.remaining>0 && cb.charAt(0)=='\n') cb.get
            case _ => //ignore everything else
        }
    }

    private def lengthOfFirstLine(cb:CharBuffer):Int = {
        (0 until cb.remaining) find { i =>
            List('\n','\r').indexOf(cb.charAt(i))>=0 } getOrElse -1
    }
}
Here is an imperative version of lengthOfFirstLine that does the same thing as the functional version above.
    private def lengthOfFirstLine(cb:CharBuffer):Int = {
        var cbLen = cb.remaining
        for (i <- 0 until cbLen) {
            val ch = cb.charAt(i)
            if (ch == '\n' || ch == '\r')
                return i
        }
        return -1
    }

NioConnection

One of the classes shown in my previous post was the NioConnection class, whose responsibilities include processing input data from the client. It does this in the method readAction, which initially looks like this:
//The old version
    private def readAction(b:ByteBuffer) {
        b.flip()
        socket.write(b)
        b.clear()
    }
We replace the direct call to socket.write with a call to LineDecoder.processBytes, which is responsible for decoding the input data, and we pass it our new writeLine method that accepts a line of characters and writes it back to the client. Also, we don't actually need the call to b.clear here, which is effectively at the bottom of our readWhile loop, since we call that method at the top of the loop.
    private val lineDecoder = new LineDecoder

    private def readAction(b:ByteBuffer) {
        b.flip()
        lineDecoder.processBytes(b, writeLine)
    }

    def writeLine(line:String) {
        socket.write(ByteBuffer.wrap((line+"\n").getBytes("UTF-8")))
    }
Now when we receive some input data, it gets passed to LineDecoder.processBytes, which converts it to characters, breaks it up into separate lines, and calls our writeLine method for each line. The writeLine method uses String.getBytes to convert the characters in the line back to bytes, wraps those bytes into a ByteBuffer and writes them directly to the output channel.

As compared to the example in the previous post, this example should behave the same externally, but we are now passing around Java strings rather than NIO buffers, which, assuming we want to deal with string data rather than binary data, will make it simpler to write the rest of the real application.

Limitations

  • As with the example in the previous post, the current example only shows how to use the NIO calls on the read side of the connection. We could use a CharsetEncoder on the write side rather than using String.getBytes and ByteBuffer.wrap.
  • Partial input lines (characters not terminated by an EOL character) are ignored by this implementation.
  • The example uses the convenience method version of decode, which assumes that the input ByteBuffer contains complete character sequences. It is possible that a multi-byte character sequence will be split such that only the first part of that sequence appears at the end of the input buffer, with the remainder of the sequence appearing at the start of the next buffer of input data. The above implementation will not properly handle this situation. The underlying decode method does handle this situation properly, but the remaining code in this example is not set up for this situation.
  • The decode convenience method throws exceptions rather than returning a status code as the full decode method does. Since these exceptions are nowhere caught in the code, such an exception would cause that task to abort. A more robust solution would have a mechanism to catch exceptions or restart an aborted task.
  • The example assumes UTF-8 encoding.

Tuesday, March 22, 2011

Java NIO and Scala Continuations

Combining Java's NIO non-blocking input/output with Scala's delimited continuations allows making a multi-client stateful server that does not require a a dedicated thread for every active client.

Contents

Motivation

When writing a high-volume server, you want it to be able to handle a large number of clients in a timely manner. You want the server to be able to respond quickly to each request even when there are a lot of clients. In particular, when you have a large number of clients for whom you are storing state but are not at this moment making a request, you want those clients to have a negligible effect on your response time, and when you have other clients currently making requests that may take some time to execute, you don't want those requests significantly increasing your response time to other requests.

From the above, we thus have these requirements:
  • The size of the state stored for each active client should be minimized.
  • The time to notice that a client has made a request should not depend significantly on the number of active clients not making a request.
  • A client request should never wait on a request from another client.
A simple Java implementation of a server might store its per-client state in a thread for each active client, and block waiting for a request on each thread. Java's thread scheduler then takes care of noticing when input is is available from one of those blocking reads and continuing execution on the associated thread. However, threads are relatively expensive and consume more memory than necessary.

An implementation that stores the state for each client manually could store that state much more compactly and thus handle many more clients, but in addition to being more complicated to implement, the application must now include a mechanism to multiplex multiple clients onto fewer threads than there are active clients without having any clients blocked by other clients.

Scala's delimited continuations give us a another option for storing the state of a client that is simpler to implement than manual state storage and that uses far less memory than using a thread for each client (see slide 19 of Phillip Haller's presentation on Actors, where he says an app can handle up to 5,000 threads, but up to 1,200,000 actors using closures).

That leaves us to solve the issue of how to multiplex many clients without having them block each other. If using the standard java.io package, the only option for reading from multiple streams without blocking is to poll using the available method. Besides the wasted CPU time, as the number of active clients grows the polling loop takes longer to run, meaning additional latency before the application notices that any particular stream is ready for reading and thus increased response time. The Java NIO package provides an efficient solution for this problem.

In the "Nio*" sections below I present a few Scala classes that break up the NIO work into chunks that I hope are easy to understand. The last piece is a simple main class to test it all.

Java Nonblocking IO

Starting with version 1.4, Java has a package called NIO (new input/output) that includes the ability to do non-blocking input and output.

The non-blocking IO model in Java NIO is based on Channels and Selectors, similar to file descriptors and the select call in POSIX operating systems. A Channel represents a connection to a file, socket, or some other data source or sink. A Selector allows collecting a set of Channels together and waiting until any of them are ready for input or output.

In order to use a Channel with a Selector, you must register the Channel with the Selector (and the Channel must be a SelectableChannel). When you register a channel you also specify which operations are of interest, such as READ or WRITE. These operations are represented by bits that can be combined by ORing them together to create a value that indicates interest in multiple operations.

Registering a channel with a selector creates a SelectionKey. When you later execute the select method on the selector, it returns a set of SelectionKeys, from which you can extract the information passed to the register call when the SelectionKey was created.

NioSelector

The register call to java.nio.channels.SelectableChannel includes an "attachment" parameter that allows the application to attach an arbitrary object to that registration and retrieve it later when the associated channel becomes selected. In our server, we attach a callback function to be executed when the channel becomes selected. We package this up in our own register method in our NioSelector class.

import java.nio.channels.SelectableChannel
import java.nio.channels.spi.SelectorProvider

/** Handle nio channel selection. */
class NioSelector {

    val selector = SelectorProvider.provider.openSelector()

    def register(channel:SelectableChannel, op:Int, body: => Unit) {
        val callback:Function0[Unit] = { () => { body }}
        channel.register(selector, op, callback)
    }
}
We add a selectOnce method that waits for one or more of the registered channels to be selected, or until the specified time has elapsed if no channels are selected during that time. The underlying call to selectedKeys in java.nio.channels.Selector returns to us a java.util.Set containing the information about which channels are selected. Before we process each channel, we need to clear the current state so that we are able to re-register interest in that channel during that processing. We make a copy of the state for all selected channels and then clear that state before processing any channels.
import scala.collection.JavaConversions
import java.nio.channels.SelectionKey

//In class NioSelector

    def selectOnce(timeout:Long) {
        selector.select(timeout)
        val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys
        val keys = JavaConversions.asScalaSet(jKeys).toList
        selector.selectedKeys.clear()
        keys foreach { _.interestOps(0) }
        val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] }
        executeCallbacks(callbacks) //Execute callbacks for all selected keys
    }
The executeCallbacks method is responsible for executing the callbacks for the selected channels. If we assume that at least some of these callbacks may take a relatively long time to complete, then in a real server we might want to throw all of these callbacks into a thread pool or farm them out to actors so that we can avoid having one long-running callback block out the rest of the channels. For now, though, we will ignore that issue and use the simple implementation of just executing each callback sequentially in the same thread.
//In class NioSelector

    def executeCallbacks(callbacks:List[()=>Unit]) {
        callbacks foreach { _() }
    }
For convenience, we add a selectLoop method that will continue to call selectOnce until we are ready to stop processing, and we make our class implement the Runnable interface.
//In NioSelector class

    def run() {
        selectLoop(true)
    }

    def selectLoop(continueProcessing: => Boolean) {
        while (continueProcessing) {
            selectOnce(0)
        }
    }
Here is the complete NioSelector class:
import scala.collection.JavaConversions
import java.nio.channels.SelectableChannel
import java.nio.channels.SelectionKey
import java.nio.channels.spi.SelectorProvider

/** Handle nio channel selection. */
class NioSelector extends Runnable {

    val selector = SelectorProvider.provider.openSelector()

    def register(channel:SelectableChannel, op:Int, body: => Unit) {
        val callback:Function0[Unit] = { () => { body }}
        channel.register(selector, op, callback)
    }

    def run() {
        selectLoop(true)
    }

    def selectLoop(continueProcessing: => Boolean) {
        while (continueProcessing) {
            selectOnce(0)
        }
    }

    def selectOnce(timeout:Long) {
        selector.select(timeout)
        val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys
        val keys = JavaConversions.asScalaSet(jKeys).toList
        selector.selectedKeys.clear()
        keys foreach { _.interestOps(0) }
        val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] }
        executeCallbacks(callbacks) //Execute callbacks for all selected keys
    }

    def executeCallbacks(callbacks:List[()=>Unit]) {
        callbacks foreach { _() }
    }
}
Our main method will create an instance of NioSelector, do some other initialization, and eventually call NioSelector.run.

NioListener

The above NioSelector class gives us our main event loop, but how do we get things hooked up to start generating events? The basic socket flow on the server side for reading data follows these steps:
  • Listen for new connections on a socket.
  • When a new connection is made, initialize that connection and wait for input data on that connection.
  • When input data is received, process that input data.
  • Eventually, close the connection.
Our NioListener class takes care of creating the socket on which we listen, accepting new connections, and passing them off to another class for per-connection initialization.

We start with the code that creates the listener socket. Since we will be passing this socket to our selector, we must create a ServerSocketChannel rather than a regular socket. We don't want to block waiting for this socket, so we call configureBlocking(false).
import java.net.{InetAddress,InetSocketAddress}
import java.nio.channels.ServerSocketChannel

class NioListener(selector:NioSelector, hostAddr:InetAddress, port:Int) {

    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false);
    val isa = new InetSocketAddress(hostAddr,port)
    serverChannel.socket.bind(isa)
}
Now that we have created our socket, we must register it with our selector. In that call, we will tell the selector that we are interested in the ACCEPT operation on our socket channel. The third argument to our register method is the callback; this is where continuations come in.

Blocking code is generally simpler and thus easier to write and understand than asynchronous (non-blocking) code. By using continuations we can capture the current state of the code, explicitly save it for later execution, and continue running the thread at a different point. The code within our delimited continuation is blocked, but the thread does not get blocked.

Rather than explicitly creating and passing in a callback with application state, we arrange things such that the callback passed to our register method includes a call to our continuation, so that when that callback is called, it executes our continuation and makes our code continue from where we left off.

We implement our own accept method that looks to the calling code like the blocking ServerSocket.accept method, but that uses continuations as described above to do the blocking. We then call our accept method from within a reset block that delimits the continuation and determines how much code is saved in the continuation (everything within the reset) and the point at which execution continues (just past the reset) in the thread when the accept call blocks.
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

//In NioListener class
    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(selector,socket)
            }
        }
    }

    private def accept():SocketChannel @suspendable = {
        shift { k =>
            selector.register(serverChannel,SelectionKey.OP_ACCEPT, {
                val conn = serverChannel.accept()
                conn.configureBlocking(false)
                k(conn)
            })
        }
    }
When we call our start method, it enters the reset block and calls accept, which registers our interest in the ACCEPT operation, saves the continuation as part of our callback in the register method and immediately returns control to the end of the reset block, thus the start method returns quickly. When a new connection arrives, the select code in NioSelector calls our callback from its executeCallbacks method. Our callback accepts the new connection and sets it to non-blocking, then executes our continuation, which will "return" from our blocked accept method into the loop in our start method. The code processes the new connection, then loops and calls accept again, which reregisters interest in an ACCEPT operation on our socket and returns execution to the caller of the continuation, which returns control to the executeCallbacks method in NioSelector.

NioConnection

Our NioConnection class is responsible for creating our connection object, registering it with the selector, reading input data when available and processing that data. The processing that we do in this example is to echo the data back to the client. In order to keep this example simple, we are not using NIO and continuations on the writing side, rather we use a simple write call to write our data directly to the socket.

As with NioListener, we create our own read call that uses continuations to block, but otherwise looks similar to the standard Java InputStream.read(byte[]) call, except that we read into a ByteBuffer rather than a byte array. As with NioListener, our start method that calls our read method (via our intermediate method readWait) contains a reset block that delimits the continuation.
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

object NioConnection {
    def newConnection(selector:NioSelector, socket:SocketChannel) {
        val conn = new NioConnection(selector,socket)
        conn.start()
    }
}

class NioConnection(selector:NioSelector, socket:SocketChannel) {

    private val buffer = ByteBuffer.allocateDirect(2000)

    def start():Unit = {
        reset {
            while (socket.isOpen)
                readWait
        }
    }

    private def readWait = {
        buffer.clear()
        val count = read(buffer)
        if (count<1)
            socket.close()
        else
            readAction(buffer)
    }

    private def read(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            selector.register(socket, SelectionKey.OP_READ, {
                val n = socket.read(b)
                k(n)
            })
        }
    }

    private def readAction(b:ByteBuffer) {
        b.flip()
        socket.write(b)
        b.clear()
    }
}
The program flow is similar to the flow in NioListener When we call our start method, it enters the reset block and (through the call to readWait) calls read, which registers our interest in the READ operation, saves the continuation as part of our callback in the register method and immediately returns control to the end of the reset block, thus the start method returns quickly. When data is available to be read from the socket, the select code in NioSelector calls our callback from its executeCallbacks method. Our callback reads the data from the connection then executes our continuation, which will "return" from our blocked read method into the readWait method. The code processes the input data in readAction (which just echos the data back to the client), then (if the socket is still open) loops and ends up calling read again, which reregisters interest in a READ operation on our socket and returns execution to the caller of the continuation, which returns control to the executeCallbacks method in NioSelector.

NioServer

Here is a short main program to test the above modules.
import java.net.InetAddress

object NioServer {
    def main(args:Array[String]) {
        val hostAddr:InetAddress = null //listen on local connection
        val port = 1234
        val server = new NioServer(hostAddr,port)
        server.start()
    }
}

class NioServer(hostAddr:InetAddress, port:Int) {
    val selector = new NioSelector()
    val listener = new NioListener(selector, hostAddr, port)

    def start() {
        listener.start(true)
        selector.run()
    }
}
Create a directory and place into that directory the above four files (NioSelector.scala, NioListener.scala, NioConnection.scala and NioServer.scala), cd into that directory, then execute the following commands to compile the files and execute the test program:
scalac -P:continuations:enable *.scala
scala NioServer
If you get an "Address already in use" error and traceback, edit NioServer.scala and pick another port number.

After starting the server, connect to it using telnet:
telnet localhost 1234
In the telnet client, each time you enter a line of text the server should echo that line of text back to you.

Limitations

The above example demonstrates a very basic echo server in a bit over 100 lines of code that uses NIO and continuations for reading data, but it has some limitations:
  • The processing of the input data is buried in the code that reads the data. A well-factored architecture would allow that processing code to be separately maintained.
  • We are using one thread for all execution. In the above example we are just echoing clients' input back to them, but if we want to do some heavier processing we will need to move that work off of the select thread onto one or more worker threads so as not to block everyone while handling a request for one client.
  • Communication between the select thread and the worker threads will require some kind of buffering, which introduces more points at which execution can be blocked, thus more opportunities to use continuations.
  • If we wish to treat the data as characters rather than bytes then we need to do that conversion.
  • We are not using IO and continuations when writing data to the client, so if for some reason our socket is not ready for writing the application will not work properly (either it will block waiting for output, or we will lose some output data).
  • We are not storing any significant amount of state in our continuations. If all we are doing is a simple echo server, we don't really need continuations. A more sophisticated example would better illustrate the convenience of using continuations to store program state.
I hope to address these points in future posts.

Resources

  • My explanation of delimited continuations.
  • Wikipedia article on Java NIO.
  • Rox Java NIO Tutorial, in which James Greenfield builds a server in Java using NIO and gives a number of tips and warnings.
  • Jenkov's Java NIO Tutorial goes over each part of the entire NIO package in detail.
  • MINA, Apache's network application framework that uses NIO.
  • Loft, a single-threaded web server in Scala using NIO and continuations (under development).
  • A post from July 2009 on the Scala mailing list with an (uncompiled) example of a simple single-threaded server using NIO. The post demonstrates the approach of creating versions of the standard calls (accept, read, write) that block using continuations.