Friday, April 15, 2011

Java Nio Complete Scala Server

The capstone to this series of posts: a complete multi-client stateful application server in Scala using Java NIO non-blocking IO for both reading and writing, and delimited continuations as coroutines for both IO and application processing.

Contents

Background

In the initial post of this series on Java NIO in Scala I mentioned a set of Limitations of the first example server. In the next three posts after that initial post I addressed some of those limitations. In this post I address the remaining limitation in that original list: the application code (an echo loop in the example) is buried in the NioConnection class, which makes that application code more difficult to maintain and makes the server code not directly reusable as a library.

With the changes described in the next section, all of the application-specific behavior will be encapsulated in an instance of an application-specific subclass of a new class, NioApplication. Since the remainder of the classes presented so far will now be independent of the application and reusable without any modifications for multiple applications, they will be moved into a separate package, net.jimmc.nio.

Other than adding package net.jimmc.nio, there were no changes to LineDecoder and NioSelector, and there were no changes to the coroutine package net.jimmc.scoroutine for this latest set of changes. For the files that were changed, listed below, the listings show the complete new version of the file, with changes from the previous version highlighted in bold.

The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-complete.

NioApplication

Extracting the application-specific code out of NioConnection is pretty simple: in NioConnection.startApp, rather than starting up a built-in echo loop, we add a hook that allows us to call back to an application-specific method that implements whatever behavior the application wants for dealing with a connection. To do this, we define a new abstract class NioApplication that includes a runConnection method that we can call from NioConnection.startApp.

We will also use the NioApplication class as a convenience class where we can bundle up some of the arguments that get passed around a lot, in particular the coroutine scheduler and the read and write selectors. This gives us the opportunity to override the coroutine scheduler with one more appropriate for the application, although we will not do so in this example.

package net.jimmc.nio

import net.jimmc.scoroutine.DefaultCoScheduler

import scala.util.continuations._

abstract class NioApplication {
    val readSelector = new NioSelector()
    val writeSelector = new NioSelector()
    val sched = new DefaultCoScheduler

    def runConnection(conn:NioConnection):Unit @suspendable
}

NioServer

We simplify the NioServer class by removing object NioServer, which will instead be in the application main object. We replace three parameters in the constructor with the single app parameter and likewise replace three arguments in the call to NioListener with the single app argument.
package net.jimmc.nio

import net.jimmc.scoroutine.DefaultCoScheduler

import java.net.InetAddress

class NioServer(app:NioApplication, hostAddr:InetAddress, port:Int) {
    val listener = new NioListener(app, hostAddr, port)

    def start() {
        listener.start(true)
        //run the NIO read and write selectors each on its own thread
        (new Thread(app.writeSelector,"WriteSelector")).start
        (new Thread(app.readSelector,"ReadSelector")).start
        Thread.currentThread.setName("CoScheduler")
        app.sched.run    //run the coroutine scheduler on our thread, renamed
    }
}

NioListener

Three parameters in the constructor have been replaced by the single app parameter.
package net.jimmc.nio

import net.jimmc.scoroutine.CoScheduler

import java.net.{InetAddress,InetSocketAddress}
import java.nio.channels.{ServerSocketChannel,SocketChannel}
import java.nio.channels.SelectionKey
import scala.util.continuations._

class NioListener(app:NioApplication, hostAddr:InetAddress, port:Int) {

    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false);
    val isa = new InetSocketAddress(hostAddr,port)
    serverChannel.socket.bind(isa)

    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(app, socket)
            }
        }
    }

    private def accept():SocketChannel @suspendable = {
        shift { k =>
            app.readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, {
                val conn = serverChannel.accept()
                conn.configureBlocking(false)
                k(conn)
            })
        }
    }
}

NioConnection

We modify the constructor and the companion to replace three parameters with the single app parameter, and we replace our echo loop in startApp with a call to the application runConnection method, followed by a call to our close method to make sure we close the socket when the application is done with it.
package net.jimmc.nio

import net.jimmc.scoroutine.{CoQueue,CoScheduler}

import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

object NioConnection {
    def newConnection(app:NioApplication, socket:SocketChannel) {
        val conn = new NioConnection(app, socket)
        conn.start()
    }
}

class NioConnection(app:NioApplication, socket:SocketChannel) {

    private val buffer = ByteBuffer.allocateDirect(2000)
    private val lineDecoder = new LineDecoder
    private val inQ = new CoQueue[String](app.sched, 10)
    private val outQ = new CoQueue[String](app.sched, 10)

    def start():Unit = {
        startReader
        startWriter
        startApp
    }

    private def startApp() {
        reset {
            app.runConnection(this)
            close()
        }
    }

    private def startReader() {
        reset {
            while (socket.isOpen)
                readWait
        }
    }

    private def readWait:Unit @suspendable = {
        buffer.clear()
        val count = read(buffer)
        if (count<1) {
            socket.close()
            shiftUnit[Unit,Unit,Unit]()
        } else {
            buffer.flip()
            lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
        }
    }

    private def read(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            app.readSelector.register(socket, SelectionKey.OP_READ, {
                val n = socket.read(b)
                k(n)
            })
        }
    }

    def readLine():String @suspendable = inQ.blockingDequeue

    private def startWriter() {
        reset {
            while (socket.isOpen)
                writeWait
        }
    }

    private def write(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            app.writeSelector.register(socket, SelectionKey.OP_WRITE, {
                val n = socket.write(b)
                k(n)
            })
        }
    }

    private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
        write(b)
        if (b.remaining>0 && socket.isOpen)
            writeBuffer(b)
        else
            shiftUnit[Unit,Unit,Unit]()
    }

    private def writeWait():Unit @suspendable = {
        val str = outQ.blockingDequeue
        if (str eq closeMarker) {
            socket.close
            shiftUnit[Unit,Unit,Unit]()
        } else
            writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
    }

    def writeLine(s:String) = write(s+"\n")
    def write(s:String) = outQ.blockingEnqueue(s)

    def isOpen = socket.isOpen
    private val closeMarker = new String("")
    def close():Unit @suspendable = write(closeMarker)
}

EchoServer

We move the application-specific main object out of NioServer and place it into our sample application class, which we call EchoServer, along with a subclassed NioApplication that provides our application behavior.

Highlighted differences are as compared to the previous version of NioServer.
import net.jimmc.nio.{NioApplication,NioConnection,NioServer}
import net.jimmc.scoroutine.DefaultCoScheduler

import java.net.InetAddress
import scala.util.continuations._

object EchoServer {
    def main(args:Array[String]) {
        val app = new EchoApplication
        val hostAddr:InetAddress = null //listen on local connection
        val port = 1234
        val server = new NioServer(app,hostAddr,port)
        server.start()
    }
}

class EchoApplication extends NioApplication {
    def runConnection(conn:NioConnection):Unit @suspendable = {
        while (conn.isOpen) {
            conn.writeLine(conn.readLine)
        }
    }
}
The above class is the complete application definition for our echo server when built on top of our generic nio package. After compiling, run with this command:
$ scala EchoServer
With all the above changes, we have once again internally transformed our application, but besides starting it up with a different name it's external behavior is still the same. However, we have reached the point where defining a new server-based application is easy.

ThreeQuestionsServer

The example in this section shows a slightly more complex application that maintains some local per-client state as it progresses through a short series of steps interacting with the client. In this simple application, the server asks up to three questions of the client and collects responses, with each next question sometimes depending on the previous answers. The per-client state is contained both in local variables and in the location of execution within the application. Each time the processing for a client is suspended the state for that client is captured in a continuation to be restored when the next piece of input is available. The continuation includes all of the above per-client state information, so we don't have to write any application-specific code to save and restore that data.

By defining the ReaderWriter interface trait, the application is written so as to be able to run either in server mode using an instance of ConnReader, in which case it accepts connections from clients, or in standalone mode using an instance of SysReader, in which case it only interacts with the console.

When our application running in server mode finishes handling a client and exits from the run method, control returns to NioConnection, which closes the connection.
import net.jimmc.nio.{NioApplication,NioServer,NioConnection}

import java.io.{BufferedReader,InputStreamReader,PrintWriter}
import java.net.InetAddress

import scala.util.continuations._

object ThreeQuestionsConsole {
    def main(args:Array[String]) {
        val in = new BufferedReader(new InputStreamReader(System.in))
        val out = new PrintWriter(System.out)
        val io = new SysReader(in,out)
        reset {
            (new ThreeQuestions(io)).run
        }
    }
}

object ThreeQuestionsServer {
    def main(args:Array[String]) {
        val app = new ThreeQuestionsApp
        val hostAddr:InetAddress = null //localhost
        val port = 1234
        val server = new NioServer(app,hostAddr,port)
        server.start()
    }
}

class ThreeQuestionsApp extends NioApplication {
    def runConnection(conn:NioConnection):Unit @suspendable = {
        val io = new ConnReader(conn)
        (new ThreeQuestions(io)).run
    }
}

trait ReaderWriter {
    def readLine():String @suspendable
    def writeLine(s:String):Unit @suspendable
}

class SysReader(in:BufferedReader,out:PrintWriter) extends ReaderWriter {
    def readLine() = in.readLine
    def writeLine(s:String) = { out.println(s); out.flush() }
}

class ConnReader(conn:NioConnection) extends ReaderWriter {
    def readLine():String @suspendable = conn.readLine
    def writeLine(s:String):Unit @suspendable = conn.writeLine(s)
}

class ThreeQuestions(io:ReaderWriter) {
    def run():Unit @suspendable = {
        val RxArthur = ".*arthur.*".r
        val RxGalahad = ".*galahad.*".r
        val RxLauncelot = ".*(launcelot|lancelot).*".r
        val RxRobin = ".*robin.*".r
        val RxHolyGrail = ".*seek the holy grail.*".r
        val RxSwallow = ".*african or european.*".r
        val RxAssyriaCapital =
            ".*(assur|shubat.enlil|kalhu|calah|nineveh|dur.sharrukin).*".r
        val name = ask("What is your name?").toLowerCase
        val quest = ask("What is your quest?").toLowerCase
        val holy = quest match {
            case RxHolyGrail() => true
            case _ => false
        }
        if (holy) {
            val q3Type = name match {
                case RxRobin() => 'capital
                case RxArthur() => 'swallow
                case _ => 'color
            }
            val a3 = (q3Type match {
                case 'capital => ask("What is the capital of Assyria?")
                case 'swallow => ask("What is the air-speed velocity of an unladen swallow?")
                case 'color => ask("What is your favorite color?")
            }).toLowerCase
            (q3Type,a3,name) match {
                //Need to use an underscore in regex patterns with alternates
                case ('capital,RxAssyriaCapital(_),_) => accept
                case ('capital,_,_) => reject
                case ('swallow,RxSwallow(),_) => rejectMe
                case ('swallow,_,_) => reject
                case ('color,"blue",RxLauncelot(_)) => accept
                case ('color,_,RxLauncelot(_)) => reject
                case ('color,"yellow",RxGalahad()) => accept
                case ('color,_,RxGalahad()) => reject
                case ('color,_,_) => accept
            }
        } else {
            reject
        }
    }

    def ask(s:String):String @suspendable = { io.writeLine(s); io.readLine }
    def accept:Unit @suspendable = io.writeLine("You may pass")
    def reject:Unit @suspendable = io.writeLine("you: Auuuuuuuugh!")
    def rejectMe:Unit @suspendable = io.writeLine("me: Auuuuuuuugh!")
}
To run in console or server mode, use one of the following two commands:
$ scala ThreeQuestionsConsole
$ scala ThreeQuestionsServer

Limitations

I am calling this version complete because it addresses all of the issues in the Limitations section of my original post, but it is far from production-ready. Before putting this code into production I would address the following issues.
  • Although the application now uses more than one thread, it still runs all of the application code on a single thread. The scheduler should be replaced by one that can choose how many threads to use and distribute the execution of the coroutines among those threads.
  • This version still has not addressed all of the issues raised in the Limitations section of the second post in this series, on character decoding. In particular:
    • Error handling should be improved.
    • It only supports UTF-8 encoding.
    For an example of this problem, type a Control-C into your telnet window when connected to the EchoServer application.
  • The application should parse its command line arguments so that it has the flexibility to, for example, use a different port number without requiring a code change.
  • The application should read a configuration file.
  • Error handling in general needs to be improved.
  • Logging should be added.

Friday, April 8, 2011

Java NIO for Writing

Using Java NIO non-blocking IO for writing as well as reading is almost - but not quite - straightforward.

Contents

Background

One of the limitations pointed out in the Limitations section of the original post in this series was that we were still directly writing our output data to the socket rather than using non-blocking IO and continuations as we were doing when reading our input data. If a client stops reading its input (or if there is sufficient network congestion that it looks that way from our end) then our socket output buffer may fill up. If that happens, then one of two things will happen when we try to write our data to that socket: either the call will block, or the data will not all be written. If the call blocks, then we have a blocked thread that we can not use for processing other clients until it is unblocked. If there are many clients who are not reading their input, we could have many blocked threads. Since one of the goals of this exercise is to be able to run many clients on a relatively small number of threads, having blocked threads is bad. To avoid this problem, we use non-blocking output and continuations for writing to the output, just as we did for reading the input.

The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-write.

Implementation

We model the output code on the input code by making these changes:
  • We write a suspending write method that registers our interest in writing to the output socket connection.
  • We add an output queue to receive data from the application.
  • We modify the writeLine method to add a line to the output queue rather than writing directly to the output socket.
  • We run a separate control loop that reads from the output queue and writes to the output socket.

//In class NioConnection
    private val outQ = new CoQueue[String](sched, 10)

    def start():Unit = {
        startReader
        startWriter
        startApp
    }

    private def startWriter() {
        reset {
            while (socket.isOpen)
                writeWait
        }
    }

    private def write(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            selector.register(socket, SelectionKey.OP_WRITE, {
                val n = socket.write(b)
                k(n)
            })
        }
    }

    private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
        write(b)
        if (b.remaining>0 && socket.isOpen)
            writeBuffer(b)
        else
            shiftUnit[Unit,Unit,Unit]()
    }

    private def writeWait:Unit @suspendable = {
        val str = outQ.blockingDequeue
        writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
    }

    def writeLine(s:String):Unit @suspendable = write(s+"\n")
    def write(s:String):Unit @suspendable = outQ.blockingEnqueue(s)
This seems pretty straightforward, but unfortunately it doesn't work. The problem is that we have attempted to register our channel twice (once for read and once for write) with the same selector. The documentation for SelectableChannel says, "A channel may be registered at most once with any particular selector." If we call register for our channel for write when it is already registered for read, the read registration is overwritten by the write registration and is lost.

In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." We could take this approach, adding some code to combine the read and write interest flags when we are in that position, but unlike in James' case we would also need to add some code to demultiplex the separate callbacks for read and write. Instead, we use a different approach: we use separate selectors for reading and writing, and we give each of them its own thread.

Two Selectors

Depending on the implementation, using two selectors and two threads this way could cause problems. However, based on my understanding of the documentation, the code in the Sun implementation and the operation of the POSIX select operation, I believe this approach should work (at least on POSIX systems). This would need to be tested on all supported platforms for a production system.

To use separate read and write selectors, we replace the current selector parameter in NioConnection with two parameters readSelector and writeSelector of the same type.
//In object NioConnection:
    def newConnection(sched:CoScheduler, readSelector:NioSelector,
            writeSelector:NioSelector, socket:SocketChannel) {
        val conn = new NioConnection(sched,readSelector,
            writeSelector,socket)
        conn.start()
    }

class NioConnection(sched:CoScheduler, readSelector:NioSelector, 
        writeSelector:NioSelector, socket:SocketChannel) {
    ...
    private def read(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            readSelector.register(socket, SelectionKey.OP_READ, {
                val n = socket.read(b)
                k(n)
            })
        }
    }

    private def write(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            writeSelector.register(socket, SelectionKey.OP_WRITE, {
                val n = socket.write(b)
                k(n)
            })
        }
    }

    ...
}
We also change NioListener to pass through those two arguments, and we choose to use the readSelector to handle our accept calls.
//In NioListener
class NioListener(sched:CoScheduler, readSelector:NioSelector,
        writeSelector:NioSelector, hostAddr:InetAddress, port:Int) {
    ...
    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(sched,
                    readSelector,writeSelector,socket)
            }
        }
    }

    private def accept():SocketChannel @suspendable = {
        shift { k =>
            readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, {
                val conn = serverChannel.accept()
                conn.configureBlocking(false)
                k(conn)
            })
        }
    }
}
Finally, we instantiate the new write selector in NioServer, pass it in to NioListener, and start it running in a new thread.
//In NioServer
class NioServer(hostAddr:InetAddress, port:Int) {
    val readSelector = new NioSelector()
    val writeSelector = new NioSelector()
    val sched = new DefaultCoScheduler
    val listener = new NioListener(sched, 
        readSelector, writeSelector, hostAddr, port)

    def start() {
        listener.start(true)
        //run the NIO read and write selectors each on its own thread
        (new Thread(writeSelector,"WriteSelector")).start
        (new Thread(readSelector,"ReadSelector")).start
        Thread.currentThread.setName("CoScheduler")
        sched.run    //run the coroutine scheduler on our thread, renamed
    }
}

Close

Our current example has no terminating condition, so never attempts to close the connection. Looking ahead, we expect to have applications that will want to do that, so we add a close method to NioConnection, and an isOpen method that allows us to see when it is closed.

We can't just add a close method that directly closes the socket, because there may still be output data waiting to be written. Thus we need an implementation that somehow waits until all of the queued output data has been written to the output before closing the socket.

One easy way to do this is to have a special marker string that we put into the output queue when the application requests to close the socket. When our socket output code sees that marker, we know it has already written out all of the data that came before that marker in the output queue, so we can close the socket. By doing the socket close in the same method that does the writes to the socket, and by ensuring that that method is called on the (write) selection thread, we also ensure that the close happens on the selection thread.

The compiler shares constant strings, so to make sure we have a unique string for our marker that can't be passed in by any code outside of our close method, we use new String(). In writeWait, where we check for that marker, we use the identity comparison eq when checking for the marker, and we add a call to shiftUnit to make both sides of the if statement be CPS.

A call to our close method will return right away, but the socket will not get closed until after all of the data in the output queue has been written to the output socket. The application can tell when the socket has actually been closed by calling the isOpen method.
//In NioConnection
    private def writeWait():Unit @suspendable = {
        val str = outQ.blockingDequeue
        if (str eq closeMarker) {
            socket.close
            shiftUnit[Unit,Unit,Unit]()
        } else
            writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
    }

    def isOpen = socket.isOpen
    private val closeMarker = new String("")
    def close():Unit @suspendable = write(closeMarker)

Summary

As in the previous two posts, we have modified the program to make an internal improvement that has not changed its basic external behavior. We have, however, changed its behavior for one of the corner cases - in this case what happens when an output socket fills up, such as might happen when there is excessive network latency - which is a necessary improvement for a production application, particularly if one expects the kind of high volume that would make those corner cases more likely.

Saturday, April 2, 2011

Java NIO and Scala Coroutines

I present a multi-client server in Scala that uses coroutines to allow modularization of stateful client processing in a way that is independent of threads.

Contents

Background

In my previous two posts I presented a server in Scala that uses Java NIO non-blocking IO and continuations to allow scaling to a large number of clients. As I pointed out in the Limitations section of that first post, that example used one thread for all execution. On a multi-core machine, as is common today, we would prefer to have multiple threads running to allow us to take advantage of all of the processing power available to us, yet we don't want to allocate a thread to every client.

It would be nice if we could add our own SelectableChannel types to the set of NIO channel types that we can use with the select call so that we could have one place where we do all our scheduling, but that feature is not available. We thus have to come up with another mechanism for handling all of the other potentially blocking tasks we will want to do. Fortunately, we already have such a mechanism: coroutines.

Coroutines

Coroutines provide a separation of the maintenance of task state from the execution of code for that task, allowing us to bind execution of the task to different threads as we desire. When one of our task coroutines becomes blocked waiting for an unavailable resource, we suspend it by storing its continuation, allowing us to use that thread for another purpose, such as to restore and run a different previously stored continuation that is now runnable.

In my earlier post on coroutines I presented an implementation of a coroutine package that included a scheduler (CoScheduler) and a blocking queue (CoQueue). We will modify the server implementation of my previous two "Java NIO" posts to make use of those classes.

As pointed out in that earlier coroutines post, the default scheduler implementation in the example can easily be replaced by another implementation with no other changes to the code. In particular, that new implementation could use a thread pool or a group of actors to execute the coroutines that are ready to run, assuming the coroutine code itself is multi-thread safe. We will not write that multi-thread scheduler for this post, but will assume that it can be written later.

Architecture

At a high level, we want to modify our server so that we have a queue between our socket reader and the application that will eventually consume the data. We can then set up a small processing loop that reads the socket data, converts it to a string and writes it to that queue. The application will read the contents of the queue, process it, and write back its results to the connection. We will let the socket reader continue to run on the select thread, but we will run the application on a separate thread (or threads), ensuring that the select loop can quickly get to all connections and preventing the application processing of any one connection from delaying the IO of other connections.

With this architecture we have two processing loops:
  1. Read data from socket, write to queue.
  2. Read data from queue, process it, write data (to socket, for now).
Given that for now we are writing directly to the connection socket on output (and ignoring the possibility that the output socket might be blocked), the second loop only has one potential blocking point: if there is no data in the queue, it will block when trying to read from the queue. The first loop has two potential blocking points: when it reads data from the socket (if there is no data available), and when it writes data to the queue (if the queue is full). The difficulty here is that the potentially blocking socket read must be handled by the NIO select call, but the potentially blocking write to the queue can't be handled by the NIO select call and thus must be handled by our own scheduler.

Having one processing loop that when blocked is sometimes managed by one scheduler (NIO select) and sometimes by another (our coroutine scheduler) is not necessarily a problem. Each scheduler just sees a blocking resource that has a continuation associated with it; when the blocking resource becomes available, the continuation is called and the process continues. The new issue that arises when trying to combine two schedulers like this is that an action by one scheduler can potentially unblock a task that is currently controlled by (i.e. in a wait state on) the other scheduler. Every time we perform an action that might unblock a task we need to ensure that the appropriate scheduler is not stuck waiting on the other tasks. In other words, we need to wake up or notify the schedulers at appropriate points in our code.

In this post, code which has changed is highlighed in bold (when not using Syntax Highlighting). Changes for CoScheduler and CoQueue are as compared to the code in my post on coroutines; changes to NioSelector, NioConnection, LineDecoder, NioListener and NioServer are as compared to the code in my previous two posts.

The complete source for this post is available on github in my nioserver project, with the specific version used in this post tagged as blog-coroutines. There are also tags for the previous two posts, so you can compare using those tags to see the changes between the versions as used in each post.

NioSelector

As mentioned above, we have to cooperate with the coroutine scheduler. In particular, we have to be able to deal with the situation that we have no active connections, so we are in a select call, then another thread registers interest in an operation. The documentation for the select call states:
Changes made to the interest sets of a selector's keys while a selection operation is in progress have no effect upon that operation; they will be seen by the next selection operation.
To terminate the select operation early so that it retries with the newly registered channel, we add a call to wakeup just after registering our interest.

Unfortunately, this is not enough. The documentation for the select call is not very precise about whether it is actually possible to call the register call from another thread while the select call is blocked waiting for a previously registered channel to become active. The documentation for SelectableChannel does explicitly say "Selectable channels are safe for use by multiple concurrent threads", but the documentation for the register method says "This method will then synchronize on the selector's key set and therefore may block if invoked concurrently with another registration or selection operation involving the same selector." In fact, the standard Sun implementation does quite a bit of synchronization, so quite easily gets deadlocked when used by multiple threads. In particular, the OS-level select call in the Java select method is inside a pair of synchronized blocks that lock the set of SelectionKeys associated with that selector. If, while the first thread is blocked on the select, a second thread calls SelectableChannel.register, it locks the channel, then attempts to lock on the key set to which that channel is being added, so it blocks. If a third thread then tries to register that channel with a second selector, which the documentation implies is allowed, the third thread will attempt to lock the channel, which will block until the second thread unblocks and releases its lock on the channel.

In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." From the description of how register works above, you can see why.

To get around this problem and ensure that all changes to the selection keys happen on the thread that is calling select, we modify NioSelect.register so that, rather than calling SelectableChannel.register directly, it packages the arguments up and puts them into a queue which is processed by the selection thread in order to make all of the calls to SelectableChannel.register just before it calls select.

Fortunately, the semantics of the wakeup call ensure that we won't get ourselves into a position where we have put our registration request into the queue but the select call doesn't see it and blocks on all the other channels. This is because wakeup is defined such that a call to it that happens while the selector is not currently in a select operation will cause the next select to wake up immediately.

With this change, all of the key set operations happen on the selection thread and, since the socket read operation is in a callback that gets executed by the selection thread in NioSelector.executeCallbacks, all socket reads (and likewise accepts) will happen on the selection thread.

//In class NioSelector
import scala.collection.mutable.SynchronizedQueue

    private case class RegistrationRequest(
        channel:SelectableChannel,op:Int,callback:Function0[Unit])
    private val regQ = new SynchronizedQueue[RegistrationRequest]

    def register(channel:SelectableChannel, op:Int, body: => Unit) {
        val callback:Function0[Unit] = { () => { body }}
        regQ.enqueue(RegistrationRequest(channel,op,callback))
        selector.wakeup()
    }

    def selectOnce(timeout:Long) {
        while (regQ.size>0) {
            val req = regQ.dequeue()
            req.channel.register(selector,req.op,req.callback)
        }
        ...
    }
}

CoScheduler

For our coroutine scheduler, we have to be able to deal with the situation that we have no coroutines that are currently runnable, then at some point one of those coroutines becomes runnable by the actions of another thread. In the architecture described above, this can happen when new data that has been read from a connection is placed into the input queue. To allow us to wait for this kind of event and to be awakened when it happens, we use Java's wait/notify model. We can't override those methods, since notify is final, so we define our own versions, which we call coWait and coNotify. Given those methods, we also extend Runnable and replace the old run method with one that runs coroutines until none are available to run, then waits until we are notified and continues the loop.
trait CoScheduler extends Runnable { cosched =>
    //we add the following items
    private val defaultLock = new java.lang.Object
    def coWait():Unit = { defaultLock.synchronized { defaultLock.wait() } }
    def coNotify():Unit = { defaultLock.synchronized { defaultLock.notify } }

    def run {
        while (true) {
            runUntilBlockedOrDone
            coWait
        }
    }
}
A coNotify method that accepts as an argument the coroutine or blocker that has potentially changed state would allow for a more efficient implementation, but for now we choose the simple implementation given above that does not attempt that optimization.

CoQueue

We use an instance of CoQueue as the queue between the socket read loop and the application processing loop. The socket read loop calls blockingEnqueue to place an item into the queue, and the application processing loop calls blockingDequeue to take an element out of the queue. The result of either of these actions could be to unblock another coroutine, so we modify those methods to add a call to coNotify in case they are being called from a coroutine that is not currently being managed by our coroutine scheduler. Since we are calling the enqueue and dequeue methods from different threads, we use a SynchronizedQueue rather than a plain Queue. Those two methods now look like this:
import scala.collection.mutable.SynchronizedQueue

class CoQueue ... extends SynchronizedQueue[A] { ...

    def blockingEnqueue(x:A):Unit @suspendable = {
        enqueueResource.waitUntilNotBlocked
        enqueue(x)
        dequeueResource.coNotify
    }

    def blockingDequeue():A @suspendable = {
        dequeueResource.waitUntilNotBlocked
        val x = dequeue
        enqueueResource.coNotify
        x
    }

NioConnection

We add a CoQueue which we use as our input queue between the socket reader loop and the application loop. For this example, we pick an arbitrary limit of 10; if our application gets behind by more than 10 items, the socket reader code will suspend when attempting to write to the queue. If more data arrives while that code is thus suspended, it will back up in the system's input buffer for that connection, and eventually the client will get an error when trying to write to its output connection.

In order to initialize the CoQueue we need to pass in a CoScheduler, so we add that parameter to our constructor and to the convenience method in our companion object.
import net.jimmc.scoroutine.{CoQueue,CoScheduler}

//In object NioConnection
    def newConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) {
        val conn = new NioConnection(sched,selector,socket)
    }

class NioConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) {
    //Add CoQueue
    private val inQ = new CoQueue[String](sched, 10)
}
Now that we have a queue, we modify our socket reader code to place our input data (after conversion to a Java string) into our queue rather than writing it straight to the output socket. We want to block when the queue is full, so we call the blockingEnqueue method. Since we now know that's the only action we will be taking, we fold the readAction method back into readWhile. Because blockingEnqueue is suspendable, the else branch of the if (count<1) code block is suspendable, so we need to make the if branch suspendable as well. We do this by adding a shiftUnit call as the final value in the if branch. The readWhile method now looks like this:
    private def readWait = {
        buffer.clear()
        val count = read(buffer)
        if (count<1) {
            socket.close()
            shiftUnit[Unit,Unit,Unit]()
        } else {
            //Moved here from readAction
            buffer.flip()
            lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
        }
    }
We now have input data going into our queue, but nobody is reading it. For this example, we implement a simple echo loop that reads from the input queue using a new readLine method and writes to the output using our existing writeLine method. We do this inside a reset block so that it becomes another coroutine that can be managed by our coroutine scheduler. Our previous start method started up the socket reader loop. We rename that one to startReader, add a startApp method that starts up our echo loop, and call both of those from a new start method. Our start method now looks like this:
//In class NioConnection
    def start():Unit = {
        startReader
        startApp
    }   
            
    private def startApp() {
        reset {
            while (socket.isOpen)
                writeLine(readLine())
        }
    }

    private def startReader() {
        reset {
            while (socket.isOpen)
                readWait
        }
    }

    def readLine():String @suspendable = inQ.blockingDequeue

LineDecoder

Our processBytes method is now getting passed a callback that is suspendable, so we need to modify the signature of our method to accept that. It passes that callback to processChars, so that signature needs to be changed in the same way. Since processChars is now calling a suspendable method, it too is suspendable, so its return signature needs to be modified to note that, and since processBytes calls processChars, it too needs to be modified to have a suspendable return signature.
//In class LineDecoder
import scala.util.continuations._

    def processBytes(b:ByteBuffer,
        lineHandler:(String)=>Unit @suspendable):Unit @suspendable = ...

    private def processChars(cb:CharBuffer,
        lineHandler:(String)=>Unit @suspendable):Unit @suspendable = { ... }

NioListener

NioListener calls NioConnection.newConnection, and that call now requires a CoScheduler argument, so we add that to our constructor and pass it through when we call newConnection.
import net.jimmc.scoroutine.CoScheduler

class NioListener(sched:CoScheduler, selector:NioSelector, hostAddr:InetAddress, port:Int) {

    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(sched,selector,socket)
            }
        }
    }
}

NioServer

NioServer instantiates the NioListener, so we need to pass it an instance of CoScheduler. We create an instance of DefaultCoScheduler and pass that in. We now need two threads, one for our coroutine scheduler and one for the NIO scheduler. In our start method, we create and start a second Thread for the NIO scheduler, then rename our own thread and run the coroutine scheduler on it.
import net.jimmc.scoroutine.DefaultCoScheduler

class NioServer(hostAddr:InetAddress, port:Int) {
    val selector = new NioSelector()
    val sched = new DefaultCoScheduler
    val listener = new NioListener(sched, selector, hostAddr, port)

    def start() {
        listener.start(true)
        //run the NIO selector on its own thread
        (new Thread(selector,"NioSelector")).start
        Thread.currentThread.setName("CoScheduler")
        sched.run    //run the coroutine scheduler on our thread, renamed
    }
}

Summary

As in the previous post, we have once again transformed our example application in a way which provides an internal improvement - in this case the ability to use multiple threads - but which has not changed its basic external behavior: we still have a simple echo server. We also have not yet addressed all of the Limitations from the first post in this series. Stay tuned for more.

Caveats

  • Although I have asserted that it is possible to write a multi-threading scheduler to the CoScheduler API, I have not yet actually done this. It is possible that this may be more difficult than I expect.
  • Multi-threaded code is generally tricky stuff. I have not spent a lot of time running this example code, so it is certainly possible that there are race conditions or other concurrency problems.