Multithread Coroutine Scheduler
A scheduler that uses multiple worker threads for continuations-based Scala coroutines.In my recent series of posts that ended with a complete Scala server that uses continuations-based coroutines to store per-client state, I asserted that the single-threaded scheduler implementation in that example could relatively easily be replaced by a scheduler that uses multiple threads. In this post I provide a simple working example of such a multithread scheduler.
Contents
Overview
We can use the standard thread-pool approach in which we have a pool of worker threads that independently pull from a common task queue. Java 1.5 introduced a set of classes and interfaces in thejava.util.concurrent
package
to support various kinds of thread pools
or potentially other task scheduling mechanisms.
Rather than writing our own, we will use an
Executor
from that package.
We have an additional requirement that makes our situation a little bit more complex than the typical thread-pool: our collection of tasks includes both tasks that are ready to run and tasks that are currently blocked but will become ready to run at some point in the future.
We will implement a new scheduler class
JavaExecutorCoScheduler
that maintains a list of blocked tasks and
uses a Java Executor
to manage runnable tasks.
The updated complete source code for this post is available on github in my nioserver project under the tag blog-executor.
Managing Tasks
As mentioned above, we need to deal with two kinds of tasks: tasks that are ready to run and tasks that are blocked. The standardExecutor
class allows us to submit a task for execution, but does not handle
blocked tasks.
Since we don't want to submit blocked tasks to the Executor
,
we have to queue them up ourselves.
We have two issues to attend to:
- When our scheduler is passed a task, we must put it into our own queue of blocked tasks if it is not currently ready to run.
- When a previously blocked task becomes ready to run,
we must remove it from our queue of
blocked tasks and pass it to the
Executor
.
CoScheduler
class to add
a method to notify it that a blocker has probably become unblocked:
def unblocked(b:Blocker):UnitWe call this method from
CoQueue
in the two places where
we previously called scheduler.coNotify
:
in the blockingEnqueue
method after we have enqueued an item
to notify the scheduler that the dequeue side is probably unblocked,
and in the blockingDequeue
method after we have dequeued an item
to notify the scheduler that the enqueue side is probably unblocked.
Those two methods in CoQueue
now look like this:
def blockingEnqueue(x:A):Unit @suspendable = { enqueueBlocker.waitUntilNotBlocked enqueue(x) scheduler.unblocked(dequeueBlocker) } def blockingDequeue():A @suspendable = { dequeueBlocker.waitUntilNotBlocked val x = dequeue scheduler.unblocked(enqueueBlocker) x }The implementation of
unblocked
in our default scheduler
DefaultCoScheduler
is just a call to coNotify
,
so the behavior of that system will remain the same as it was before we added
the calls to unblocked
.
Because we need to ensure that all of our NIO read and write operations are handled sequentially, we continue to manage those tasks separately with our
NioSelector
class,
where all of the reads are executed on one thread and all of the writes
are executed on another thread.
Scheduler
We already have a scheduler framework that defines aCoScheduler
class as the parent class for our scheduler implementations,
which requires that we implement the methods
setRoutineContinuation
, runNextUnblockedRoutine
and the newly added unblocked
.
In our
JavaExecutorCoSchduler
,
our setRoutineContinuation
method is responsible for storing
or executing the task.
It checks to see if the task is currently blocked, storing it
in our list of blocked tasks if so.
Otherwise, it passes it to the thread pool (which is managed by an
ExecutorService
),
which takes care of managing the threads and running the task.
We define a simple case class, RunnableCont
, to turn our task
into a Runnable
that is usable by the pool.
Our
unblocked
method gets passed a blocker which is probably
now unblocked.
We test that, and if in fact it is still blocked we do nothing.
If it is unblocked, then we remove it from our list of blocked tasks
and pass it to the pool.
The
runNextUnblockedRoutine
method in this scheduler doesn't
actually do anything, since the pool is taking care of running everything.
We just return SomeRoutinesBlocked
so that the caller goes
into a wait state.
In addition to the above three methods, we will have our thread pool, a lock that we use when managing our blocked and runnable tasks, and a set of blocked tasks waiting to become unblocked. For this implementation we choose to use a thread pool of a fixed size, thus the call to
Executors.newFixedThreadPool
.
Here is our complete
JavaExecutorCoScheduler
class:
package net.jimmc.scoroutine import java.lang.Runnable import java.util.concurrent.Executors import java.util.concurrent.ExecutorService import scala.collection.mutable.LinkedHashMap import scala.collection.mutable.SynchronizedMap class JavaExecutorCoScheduler(numWorkers:Int) extends CoScheduler { type Task = Option[Unit=>Unit] case class RunnableCont(task:Task) extends Runnable { def run() = task foreach { _() } } private val pool = Executors.newFixedThreadPool(numWorkers) private val lock = new java.lang.Object private val blockedTasks = new LinkedHashMap[Blocker,Task] with SynchronizedMap[Blocker,Task] private[scoroutine] def setRoutineContinuation(b:Blocker,task:Task) { lock.synchronized { if (b.isBlocked) { blockedTasks(b) = task } else { pool.execute(RunnableCont(task)) coNotify } } } def unblocked(b:Blocker):Unit = { lock.synchronized { if (!b.isBlocked) blockedTasks.remove(b) foreach { task => pool.execute(RunnableCont(task)) } } coNotify } def runNextUnblockedRoutine():RunStatus = SomeRoutinesBlocked }
Synchronization
Although not necessitated by the above changes, I added one more change toCoScheduler
to improve its synchronization behavior.
While exploring various multi-threading mechanisms as alternatives to using
Executor
,
I wrote a scheduler called MultiThreadCoScheduler
in which I implemented my own thread pool
and in which the master thread directly
allocated tasks to the worker threads in the pool.
Although that scheduler was quite a bit larger than the one presented
above, it provided much more control over the threads, allowing me to
change the number of worker threads on the fly
and to be able to tell in my master
thread whether there were any running worker threads.
In
MultiThreadCoScheduler
,
the main thread would call coWait
to wait until it needed to wake up and hand out another task,
and the worker threads would call coNotify
when they were
done processing a task and were ready to be assigned the next task.
Similarly, a call to coNotify
would be issued whenever
a new task was placed into the task queue.
Unfortunately, Java's
wait
and
notify
methods,
which are the calls underlying our coWait
and coNotify
methods,
do not quite behave the way we would like.
If we compare those calls to the Java NIO
select
and wakeup
calls,
we note that if a call is made to wakeup
before
a call to select
,
the select
call will return immediately.
The wait
/notify
calls do not behave this way;
if a call is made to notify
when there is no thread waiting
in a wait
call on that
monitor, the notify
call
does nothing, and the following call to wait
will wait until
the next call to notify
.
This small difference in semantics actually makes a pretty big difference in behavior, because it means when using
wait
and
notify
you must be concerned with which happens first.
Let's see how that works.
In a typical scenario we have a resource with a boolean state that indicates when a thread can access that resource, for example, a queue with a boolean state of "has some data" that indicates when a reader thread can pull an item from the queue (and perhaps another boolean state of "queue is full" that indicates when a writer thread can put an item into the queue). In the case of
MultiThreadCoScheduler
we have a task with a "ready" flag that tells us when we can
assign that task to a worker,
and a worker with an "idle" flag that tells us when we can
assign a task to that worker.
When a task becomes ready to run, we want a thread
(other than the master, since it may be waiting)
to add the task to our queue of
tasks and then notify the master that a task is available.
Meanwhile, when the master is looking for an available task to assign
to an idle worker, it will query to
see if a task is available, and if not it will then wait until one becomes
available.
The problem sequence would be if the master checks for available tasks,
finds none, then before the master executes its wait, the non-master puts
a ready task into the queue and issues a notify to the master.
The result of this sequence would be a ready task in the queue, but a
master waiting for a notify.
When all of the synchronization is done within a single class, you can ensure that the above problem sequencing of operations does not happen by arranging that the code that places a ready task into the queue and notifies the master happens within one
synchronized
block,
and the code used by the master to query the queue for a ready task and
then to wait happens within one synchronized
block on the same
monitor.
But when dealing with subclasses, we run into the
"inheritance anomaly"
(or "inheritance-synchronization anomaly").
The essence of this problem is that the base class provides a method
that is synchronized, but the subclass would like to include more
functionality within that synchronized block.
If, as is often the case, the subclass does not have access to the monitor
being used by the base class to control its synchronization,
there is no way for it to do this.
In our case, we can implement something that is sufficient for our current needs by making a small change to our
coWait
and coNotify
methods in CoScheduler
so that they behave in the same manner as
select
and wakeup
:
if a call to coNotify
is made before a call to coWait
,
the call to coWait
will return immediately.
We do this by changing the implementation of coWait
and
coNotify
in CoScheduler
from this:
def coWait():Unit = { defaultLock.synchronized { defaultLock.wait() } } def coNotify():Unit = { defaultLock.synchronized { defaultLock.notify } }to this:
private var notified = false def coWait():Unit = { defaultLock.synchronized { if (!notified) defaultLock.wait() notified = false } } def coNotify():Unit = { defaultLock.synchronized { notified = true defaultLock.notify } }With the above change to our base class, our subclass no longer needs to be concerned about the problem sequence described above, because the call to
coWait
will return immediately if there
was a call to coNotify
since the most recent previous call
to coWait
.
2 comments:
Because of the possibility of spurious wakeups you should always use while loops when using wait/notify:
defaultLock.synchronized {
while(!notified) defaultLock.wait()
notified = false
}
AndiHofi: Yes, in general that is a good rule. Since in this case coWait is a version of wait, I expected the caller to determine whether he considers the wakeup spurious and re-invoke coWait, so I had not bothered to make that check myself within coWait.
Post a Comment