Saturday, April 2, 2011

Java NIO and Scala Coroutines

I present a multi-client server in Scala that uses coroutines to allow modularization of stateful client processing in a way that is independent of threads.



In my previous two posts I presented a server in Scala that uses Java NIO non-blocking IO and continuations to allow scaling to a large number of clients. As I pointed out in the Limitations section of that first post, that example used one thread for all execution. On a multi-core machine, as is common today, we would prefer to have multiple threads running to allow us to take advantage of all of the processing power available to us, yet we don't want to allocate a thread to every client.

It would be nice if we could add our own SelectableChannel types to the set of NIO channel types that we can use with the select call so that we could have one place where we do all our scheduling, but that feature is not available. We thus have to come up with another mechanism for handling all of the other potentially blocking tasks we will want to do. Fortunately, we already have such a mechanism: coroutines.


Coroutines provide a separation of the maintenance of task state from the execution of code for that task, allowing us to bind execution of the task to different threads as we desire. When one of our task coroutines becomes blocked waiting for an unavailable resource, we suspend it by storing its continuation, allowing us to use that thread for another purpose, such as to restore and run a different previously stored continuation that is now runnable.

In my earlier post on coroutines I presented an implementation of a coroutine package that included a scheduler (CoScheduler) and a blocking queue (CoQueue). We will modify the server implementation of my previous two "Java NIO" posts to make use of those classes.

As pointed out in that earlier coroutines post, the default scheduler implementation in the example can easily be replaced by another implementation with no other changes to the code. In particular, that new implementation could use a thread pool or a group of actors to execute the coroutines that are ready to run, assuming the coroutine code itself is multi-thread safe. We will not write that multi-thread scheduler for this post, but will assume that it can be written later.


At a high level, we want to modify our server so that we have a queue between our socket reader and the application that will eventually consume the data. We can then set up a small processing loop that reads the socket data, converts it to a string and writes it to that queue. The application will read the contents of the queue, process it, and write back its results to the connection. We will let the socket reader continue to run on the select thread, but we will run the application on a separate thread (or threads), ensuring that the select loop can quickly get to all connections and preventing the application processing of any one connection from delaying the IO of other connections.

With this architecture we have two processing loops:
  1. Read data from socket, write to queue.
  2. Read data from queue, process it, write data (to socket, for now).
Given that for now we are writing directly to the connection socket on output (and ignoring the possibility that the output socket might be blocked), the second loop only has one potential blocking point: if there is no data in the queue, it will block when trying to read from the queue. The first loop has two potential blocking points: when it reads data from the socket (if there is no data available), and when it writes data to the queue (if the queue is full). The difficulty here is that the potentially blocking socket read must be handled by the NIO select call, but the potentially blocking write to the queue can't be handled by the NIO select call and thus must be handled by our own scheduler.

Having one processing loop that when blocked is sometimes managed by one scheduler (NIO select) and sometimes by another (our coroutine scheduler) is not necessarily a problem. Each scheduler just sees a blocking resource that has a continuation associated with it; when the blocking resource becomes available, the continuation is called and the process continues. The new issue that arises when trying to combine two schedulers like this is that an action by one scheduler can potentially unblock a task that is currently controlled by (i.e. in a wait state on) the other scheduler. Every time we perform an action that might unblock a task we need to ensure that the appropriate scheduler is not stuck waiting on the other tasks. In other words, we need to wake up or notify the schedulers at appropriate points in our code.

In this post, code which has changed is highlighed in bold (when not using Syntax Highlighting). Changes for CoScheduler and CoQueue are as compared to the code in my post on coroutines; changes to NioSelector, NioConnection, LineDecoder, NioListener and NioServer are as compared to the code in my previous two posts.

The complete source for this post is available on github in my nioserver project, with the specific version used in this post tagged as blog-coroutines. There are also tags for the previous two posts, so you can compare using those tags to see the changes between the versions as used in each post.


As mentioned above, we have to cooperate with the coroutine scheduler. In particular, we have to be able to deal with the situation that we have no active connections, so we are in a select call, then another thread registers interest in an operation. The documentation for the select call states:
Changes made to the interest sets of a selector's keys while a selection operation is in progress have no effect upon that operation; they will be seen by the next selection operation.
To terminate the select operation early so that it retries with the newly registered channel, we add a call to wakeup just after registering our interest.

Unfortunately, this is not enough. The documentation for the select call is not very precise about whether it is actually possible to call the register call from another thread while the select call is blocked waiting for a previously registered channel to become active. The documentation for SelectableChannel does explicitly say "Selectable channels are safe for use by multiple concurrent threads", but the documentation for the register method says "This method will then synchronize on the selector's key set and therefore may block if invoked concurrently with another registration or selection operation involving the same selector." In fact, the standard Sun implementation does quite a bit of synchronization, so quite easily gets deadlocked when used by multiple threads. In particular, the OS-level select call in the Java select method is inside a pair of synchronized blocks that lock the set of SelectionKeys associated with that selector. If, while the first thread is blocked on the select, a second thread calls SelectableChannel.register, it locks the channel, then attempts to lock on the key set to which that channel is being added, so it blocks. If a third thread then tries to register that channel with a second selector, which the documentation implies is allowed, the third thread will attempt to lock the channel, which will block until the second thread unblocks and releases its lock on the channel.

In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." From the description of how register works above, you can see why.

To get around this problem and ensure that all changes to the selection keys happen on the thread that is calling select, we modify NioSelect.register so that, rather than calling SelectableChannel.register directly, it packages the arguments up and puts them into a queue which is processed by the selection thread in order to make all of the calls to SelectableChannel.register just before it calls select.

Fortunately, the semantics of the wakeup call ensure that we won't get ourselves into a position where we have put our registration request into the queue but the select call doesn't see it and blocks on all the other channels. This is because wakeup is defined such that a call to it that happens while the selector is not currently in a select operation will cause the next select to wake up immediately.

With this change, all of the key set operations happen on the selection thread and, since the socket read operation is in a callback that gets executed by the selection thread in NioSelector.executeCallbacks, all socket reads (and likewise accepts) will happen on the selection thread.

//In class NioSelector
import scala.collection.mutable.SynchronizedQueue

    private case class RegistrationRequest(
    private val regQ = new SynchronizedQueue[RegistrationRequest]

    def register(channel:SelectableChannel, op:Int, body: => Unit) {
        val callback:Function0[Unit] = { () => { body }}

    def selectOnce(timeout:Long) {
        while (regQ.size>0) {
            val req = regQ.dequeue()


For our coroutine scheduler, we have to be able to deal with the situation that we have no coroutines that are currently runnable, then at some point one of those coroutines becomes runnable by the actions of another thread. In the architecture described above, this can happen when new data that has been read from a connection is placed into the input queue. To allow us to wait for this kind of event and to be awakened when it happens, we use Java's wait/notify model. We can't override those methods, since notify is final, so we define our own versions, which we call coWait and coNotify. Given those methods, we also extend Runnable and replace the old run method with one that runs coroutines until none are available to run, then waits until we are notified and continues the loop.
trait CoScheduler extends Runnable { cosched =>
    //we add the following items
    private val defaultLock = new java.lang.Object
    def coWait():Unit = { defaultLock.synchronized { defaultLock.wait() } }
    def coNotify():Unit = { defaultLock.synchronized { defaultLock.notify } }

    def run {
        while (true) {
A coNotify method that accepts as an argument the coroutine or blocker that has potentially changed state would allow for a more efficient implementation, but for now we choose the simple implementation given above that does not attempt that optimization.


We use an instance of CoQueue as the queue between the socket read loop and the application processing loop. The socket read loop calls blockingEnqueue to place an item into the queue, and the application processing loop calls blockingDequeue to take an element out of the queue. The result of either of these actions could be to unblock another coroutine, so we modify those methods to add a call to coNotify in case they are being called from a coroutine that is not currently being managed by our coroutine scheduler. Since we are calling the enqueue and dequeue methods from different threads, we use a SynchronizedQueue rather than a plain Queue. Those two methods now look like this:
import scala.collection.mutable.SynchronizedQueue

class CoQueue ... extends SynchronizedQueue[A] { ...

    def blockingEnqueue(x:A):Unit @suspendable = {

    def blockingDequeue():A @suspendable = {
        val x = dequeue


We add a CoQueue which we use as our input queue between the socket reader loop and the application loop. For this example, we pick an arbitrary limit of 10; if our application gets behind by more than 10 items, the socket reader code will suspend when attempting to write to the queue. If more data arrives while that code is thus suspended, it will back up in the system's input buffer for that connection, and eventually the client will get an error when trying to write to its output connection.

In order to initialize the CoQueue we need to pass in a CoScheduler, so we add that parameter to our constructor and to the convenience method in our companion object.
import net.jimmc.scoroutine.{CoQueue,CoScheduler}

//In object NioConnection
    def newConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) {
        val conn = new NioConnection(sched,selector,socket)

class NioConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) {
    //Add CoQueue
    private val inQ = new CoQueue[String](sched, 10)
Now that we have a queue, we modify our socket reader code to place our input data (after conversion to a Java string) into our queue rather than writing it straight to the output socket. We want to block when the queue is full, so we call the blockingEnqueue method. Since we now know that's the only action we will be taking, we fold the readAction method back into readWhile. Because blockingEnqueue is suspendable, the else branch of the if (count<1) code block is suspendable, so we need to make the if branch suspendable as well. We do this by adding a shiftUnit call as the final value in the if branch. The readWhile method now looks like this:
    private def readWait = {
        val count = read(buffer)
        if (count<1) {
        } else {
            //Moved here from readAction
            lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
We now have input data going into our queue, but nobody is reading it. For this example, we implement a simple echo loop that reads from the input queue using a new readLine method and writes to the output using our existing writeLine method. We do this inside a reset block so that it becomes another coroutine that can be managed by our coroutine scheduler. Our previous start method started up the socket reader loop. We rename that one to startReader, add a startApp method that starts up our echo loop, and call both of those from a new start method. Our start method now looks like this:
//In class NioConnection
    def start():Unit = {
    private def startApp() {
        reset {
            while (socket.isOpen)

    private def startReader() {
        reset {
            while (socket.isOpen)

    def readLine():String @suspendable = inQ.blockingDequeue


Our processBytes method is now getting passed a callback that is suspendable, so we need to modify the signature of our method to accept that. It passes that callback to processChars, so that signature needs to be changed in the same way. Since processChars is now calling a suspendable method, it too is suspendable, so its return signature needs to be modified to note that, and since processBytes calls processChars, it too needs to be modified to have a suspendable return signature.
//In class LineDecoder
import scala.util.continuations._

    def processBytes(b:ByteBuffer,
        lineHandler:(String)=>Unit @suspendable):Unit @suspendable = ...

    private def processChars(cb:CharBuffer,
        lineHandler:(String)=>Unit @suspendable):Unit @suspendable = { ... }


NioListener calls NioConnection.newConnection, and that call now requires a CoScheduler argument, so we add that to our constructor and pass it through when we call newConnection.
import net.jimmc.scoroutine.CoScheduler

class NioListener(sched:CoScheduler, selector:NioSelector, hostAddr:InetAddress, port:Int) {

    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()


NioServer instantiates the NioListener, so we need to pass it an instance of CoScheduler. We create an instance of DefaultCoScheduler and pass that in. We now need two threads, one for our coroutine scheduler and one for the NIO scheduler. In our start method, we create and start a second Thread for the NIO scheduler, then rename our own thread and run the coroutine scheduler on it.
import net.jimmc.scoroutine.DefaultCoScheduler

class NioServer(hostAddr:InetAddress, port:Int) {
    val selector = new NioSelector()
    val sched = new DefaultCoScheduler
    val listener = new NioListener(sched, selector, hostAddr, port)

    def start() {
        //run the NIO selector on its own thread
        (new Thread(selector,"NioSelector")).start
        Thread.currentThread.setName("CoScheduler")    //run the coroutine scheduler on our thread, renamed


As in the previous post, we have once again transformed our example application in a way which provides an internal improvement - in this case the ability to use multiple threads - but which has not changed its basic external behavior: we still have a simple echo server. We also have not yet addressed all of the Limitations from the first post in this series. Stay tuned for more.


  • Although I have asserted that it is possible to write a multi-threading scheduler to the CoScheduler API, I have not yet actually done this. It is possible that this may be more difficult than I expect.
  • Multi-threaded code is generally tricky stuff. I have not spent a lot of time running this example code, so it is certainly possible that there are race conditions or other concurrency problems.

No comments: