Contents
- Background
- Coroutines
- Architecture
- NioSelector
- CoScheduler
- CoQueue
- NioConnection
- LineDecoder
- NioListener
- NioServer
- Summary
- Caveats
Background
In my previous two posts I presented a server in Scala that uses Java NIO non-blocking IO and continuations to allow scaling to a large number of clients. As I pointed out in the Limitations section of that first post, that example used one thread for all execution. On a multi-core machine, as is common today, we would prefer to have multiple threads running to allow us to take advantage of all of the processing power available to us, yet we don't want to allocate a thread to every client.It would be nice if we could add our own
SelectableChannel
types to the set of NIO channel types that we can use with the
select
call so that we
could have one place where we do all our scheduling,
but that feature is not available.
We thus have to come up with another mechanism for handling all of the
other potentially blocking tasks we will want to do.
Fortunately, we already have such a mechanism: coroutines.
Coroutines
Coroutines provide a separation of the maintenance of task state from the execution of code for that task, allowing us to bind execution of the task to different threads as we desire. When one of our task coroutines becomes blocked waiting for an unavailable resource, we suspend it by storing its continuation, allowing us to use that thread for another purpose, such as to restore and run a different previously stored continuation that is now runnable.In my earlier post on coroutines I presented an implementation of a coroutine package that included a scheduler (
CoScheduler
)
and a blocking queue (CoQueue
).
We will modify the server implementation
of my previous two "Java NIO" posts
to make use of those classes.
As pointed out in that earlier coroutines post, the default scheduler implementation in the example can easily be replaced by another implementation with no other changes to the code. In particular, that new implementation could use a thread pool or a group of actors to execute the coroutines that are ready to run, assuming the coroutine code itself is multi-thread safe. We will not write that multi-thread scheduler for this post, but will assume that it can be written later.
Architecture
At a high level, we want to modify our server so that we have a queue between our socket reader and the application that will eventually consume the data. We can then set up a small processing loop that reads the socket data, converts it to a string and writes it to that queue. The application will read the contents of the queue, process it, and write back its results to the connection. We will let the socket reader continue to run on the select thread, but we will run the application on a separate thread (or threads), ensuring that the select loop can quickly get to all connections and preventing the application processing of any one connection from delaying the IO of other connections.With this architecture we have two processing loops:
- Read data from socket, write to queue.
- Read data from queue, process it, write data (to socket, for now).
Having one processing loop that when blocked is sometimes managed by one scheduler (NIO select) and sometimes by another (our coroutine scheduler) is not necessarily a problem. Each scheduler just sees a blocking resource that has a continuation associated with it; when the blocking resource becomes available, the continuation is called and the process continues. The new issue that arises when trying to combine two schedulers like this is that an action by one scheduler can potentially unblock a task that is currently controlled by (i.e. in a wait state on) the other scheduler. Every time we perform an action that might unblock a task we need to ensure that the appropriate scheduler is not stuck waiting on the other tasks. In other words, we need to wake up or notify the schedulers at appropriate points in our code.
In this post, code which has changed is highlighed in bold (when not using Syntax Highlighting). Changes for
CoScheduler
and CoQueue
are
as compared to the code in my
post
on coroutines;
changes to
NioSelector
,
NioConnection
,
LineDecoder
,
NioListener
and
NioServer
are as compared to the code in my
previous
two
posts.
The complete source for this post is available on github in my nioserver project, with the specific version used in this post tagged as blog-coroutines. There are also tags for the previous two posts, so you can compare using those tags to see the changes between the versions as used in each post.
NioSelector
As mentioned above, we have to cooperate with the coroutine scheduler. In particular, we have to be able to deal with the situation that we have no active connections, so we are in a select call, then another thread registers interest in an operation. The documentation for theselect
call states:
Changes made to the interest sets of a selector's keys while a selection operation is in progress have no effect upon that operation; they will be seen by the next selection operation.To terminate the select operation early so that it retries with the newly registered channel, we add a call to
wakeup
just after registering our interest.
Unfortunately, this is not enough. The documentation for the
select
call
is not very precise about
whether it is actually possible to call the register
call from another thread while the select
call is
blocked waiting for a previously registered channel to become active.
The documentation for
SelectableChannel
does explicitly say
"Selectable channels are safe for use by multiple concurrent threads",
but the documentation for the
register
method says
"This method will then synchronize on the selector's key set and
therefore may block if invoked concurrently with another registration
or selection operation involving the same selector."
In fact, the standard Sun implementation does quite a bit of
synchronization, so quite easily gets deadlocked when used by
multiple threads.
In particular, the OS-level select call in the Java select
method is inside a pair of synchronized
blocks that lock
the set of SelectionKey
s associated with that selector.
If, while the first thread is blocked on the select,
a second thread calls SelectableChannel.register
,
it locks the channel, then attempts to lock on the key set to which
that channel is being added, so it blocks.
If a third thread then tries to register that channel with a second
selector, which the documentation implies is allowed,
the third thread will attempt to lock the channel, which will
block until the second thread unblocks and releases its lock on the channel.
In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." From the description of how
register
works above, you can see why.
To get around this problem and ensure that all changes to the selection keys happen on the thread that is calling select, we modify
NioSelect.register
so that,
rather than calling SelectableChannel.register
directly,
it packages the arguments up and puts them into a queue which is
processed by the selection thread
in order to make all of the calls to SelectableChannel.register
just before it calls select
.
Fortunately, the semantics of the
wakeup
call ensure that
we won't get ourselves into a position where we have put our registration
request into the queue but the select
call doesn't see it
and blocks on all the other channels.
This is because wakeup
is defined such that a call to it
that happens while the selector is not currently in a select operation
will cause the next select
to wake up immediately.
With this change, all of the key set operations happen on the selection thread and, since the socket read operation is in a callback that gets executed by the selection thread in
NioSelector.executeCallbacks
,
all socket reads (and likewise accepts) will happen on the
selection thread.
//In class NioSelector import scala.collection.mutable.SynchronizedQueue private case class RegistrationRequest( channel:SelectableChannel,op:Int,callback:Function0[Unit]) private val regQ = new SynchronizedQueue[RegistrationRequest] def register(channel:SelectableChannel, op:Int, body: => Unit) { val callback:Function0[Unit] = { () => { body }} regQ.enqueue(RegistrationRequest(channel,op,callback)) selector.wakeup() } def selectOnce(timeout:Long) { while (regQ.size>0) { val req = regQ.dequeue() req.channel.register(selector,req.op,req.callback) } ... } }
CoScheduler
For our coroutine scheduler, we have to be able to deal with the situation that we have no coroutines that are currently runnable, then at some point one of those coroutines becomes runnable by the actions of another thread. In the architecture described above, this can happen when new data that has been read from a connection is placed into the input queue. To allow us to wait for this kind of event and to be awakened when it happens, we use Java'swait
/notify
model. We can't override those methods, since notify
is final, so we define our own versions,
which we call coWait
and coNotify
.
Given those methods, we also extend Runnable
and replace the old run
method with one
that runs coroutines until none are available to run,
then waits until we are notified and continues the loop.
trait CoScheduler extends Runnable { cosched => //we add the following items private val defaultLock = new java.lang.Object def coWait():Unit = { defaultLock.synchronized { defaultLock.wait() } } def coNotify():Unit = { defaultLock.synchronized { defaultLock.notify } } def run { while (true) { runUntilBlockedOrDone coWait } } }A
coNotify
method that accepts as an argument the coroutine
or blocker that has potentially changed state would allow for a more
efficient implementation, but for now we choose the simple implementation
given above that does not attempt that optimization.
CoQueue
We use an instance ofCoQueue
as the queue between the
socket read loop and the application processing loop.
The socket read loop calls blockingEnqueue
to place an
item into the queue, and the application processing loop calls
blockingDequeue
to take an element out of the queue.
The result of either of these actions could be to unblock another
coroutine, so we modify those methods to add a call to coNotify
in case they are being called from a coroutine that is not currently
being managed by our coroutine scheduler.
Since we are calling the enqueue and dequeue methods from different threads,
we use a
SynchronizedQueue
rather than a plain
Queue
.
Those two methods now look like this:
import scala.collection.mutable.SynchronizedQueue class CoQueue ... extends SynchronizedQueue[A] { ... def blockingEnqueue(x:A):Unit @suspendable = { enqueueResource.waitUntilNotBlocked enqueue(x) dequeueResource.coNotify } def blockingDequeue():A @suspendable = { dequeueResource.waitUntilNotBlocked val x = dequeue enqueueResource.coNotify x }
NioConnection
We add aCoQueue
which we use as our input queue between
the socket reader loop and the application loop.
For this example, we pick an arbitrary limit of 10;
if our application gets behind by more than 10 items,
the socket reader code will suspend when attempting to write to the queue.
If more data arrives while that code is thus suspended,
it will back up in the system's input buffer for that connection,
and eventually the client will get an error when trying to write
to its output connection.
In order to initialize the
CoQueue
we need to pass in a CoScheduler
, so we add that
parameter to our constructor and to the convenience method
in our companion object.
import net.jimmc.scoroutine.{CoQueue,CoScheduler} //In object NioConnection def newConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) { val conn = new NioConnection(sched,selector,socket) } class NioConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) { //Add CoQueue private val inQ = new CoQueue[String](sched, 10) }Now that we have a queue, we modify our socket reader code to place our input data (after conversion to a Java string) into our queue rather than writing it straight to the output socket. We want to block when the queue is full, so we call the
blockingEnqueue
method.
Since we now know that's the only action we will be taking,
we fold the readAction
method back into readWhile
.
Because blockingEnqueue
is suspendable,
the else
branch of the
if (count<1)
code block is suspendable, so we need to
make the if
branch suspendable as well.
We do this by adding a shiftUnit
call as the final value
in the if
branch.
The readWhile
method now looks like this:
private def readWait = { buffer.clear() val count = read(buffer) if (count<1) { socket.close() shiftUnit[Unit,Unit,Unit]() } else { //Moved here from readAction buffer.flip() lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_)) } }We now have input data going into our queue, but nobody is reading it. For this example, we implement a simple echo loop that reads from the input queue using a new
readLine
method
and writes to the output using our existing writeLine
method.
We do this inside a reset
block so that
it becomes another coroutine that can be managed by our
coroutine scheduler.
Our previous start
method started up the socket reader loop.
We rename that one to startReader
, add a
startApp
method that starts up our echo loop,
and call both of those from a new start
method.
Our start
method now looks like this:
//In class NioConnection def start():Unit = { startReader startApp } private def startApp() { reset { while (socket.isOpen) writeLine(readLine()) } } private def startReader() { reset { while (socket.isOpen) readWait } } def readLine():String @suspendable = inQ.blockingDequeue
LineDecoder
OurprocessBytes
method is now getting passed a callback
that is suspendable, so we need to modify the signature of our
method to accept that.
It passes that callback to processChars
, so that
signature needs to be changed in the same way.
Since processChars
is now calling a suspendable method,
it too is suspendable, so its return signature
needs to be modified to note that,
and since processBytes
calls processChars
,
it too needs to be modified to have a suspendable return signature.
//In class LineDecoder import scala.util.continuations._ def processBytes(b:ByteBuffer, lineHandler:(String)=>Unit @suspendable):Unit @suspendable = ... private def processChars(cb:CharBuffer, lineHandler:(String)=>Unit @suspendable):Unit @suspendable = { ... }
NioListener
NioListener
calls NioConnection.newConnection
,
and that call now requires a CoScheduler
argument,
so we add that to our constructor and pass it through when we call
newConnection
.
import net.jimmc.scoroutine.CoScheduler class NioListener(sched:CoScheduler, selector:NioSelector, hostAddr:InetAddress, port:Int) { def start(continueListening: =>Boolean):Unit = { reset { while (continueListening) { val socket = accept() NioConnection.newConnection(sched,selector,socket) } } } }
NioServer
NioServer
instantiates the NioListener
, so
we need to pass it an instance of CoScheduler
.
We create an instance of DefaultCoScheduler
and pass that in.
We now need two threads, one for our coroutine scheduler and one
for the NIO scheduler.
In our start
method,
we create and start a second Thread
for the NIO scheduler,
then rename our own thread and run the coroutine scheduler on it.
import net.jimmc.scoroutine.DefaultCoScheduler class NioServer(hostAddr:InetAddress, port:Int) { val selector = new NioSelector() val sched = new DefaultCoScheduler val listener = new NioListener(sched, selector, hostAddr, port) def start() { listener.start(true) //run the NIO selector on its own thread (new Thread(selector,"NioSelector")).start Thread.currentThread.setName("CoScheduler") sched.run //run the coroutine scheduler on our thread, renamed } }
Summary
As in the previous post, we have once again transformed our example application in a way which provides an internal improvement - in this case the ability to use multiple threads - but which has not changed its basic external behavior: we still have a simple echo server. We also have not yet addressed all of the Limitations from the first post in this series. Stay tuned for more.Caveats
- Although I have asserted that it is possible to write a multi-threading
scheduler to the
CoScheduler
API, I have not yet actually done this. It is possible that this may be more difficult than I expect. - Multi-threaded code is generally tricky stuff. I have not spent a lot of time running this example code, so it is certainly possible that there are race conditions or other concurrency problems.
No comments:
Post a Comment