Tuesday, July 19, 2011

Multithread Coroutine Scheduler

Multithread Coroutine Scheduler

A scheduler that uses multiple worker threads for continuations-based Scala coroutines.

In my recent series of posts that ended with a complete Scala server that uses continuations-based coroutines to store per-client state, I asserted that the single-threaded scheduler implementation in that example could relatively easily be replaced by a scheduler that uses multiple threads. In this post I provide a simple working example of such a multithread scheduler.

Contents

Overview

We can use the standard thread-pool approach in which we have a pool of worker threads that independently pull from a common task queue. Java 1.5 introduced a set of classes and interfaces in the java.util.concurrent package to support various kinds of thread pools or potentially other task scheduling mechanisms. Rather than writing our own, we will use an Executor from that package.

We have an additional requirement that makes our situation a little bit more complex than the typical thread-pool: our collection of tasks includes both tasks that are ready to run and tasks that are currently blocked but will become ready to run at some point in the future.

We will implement a new scheduler class JavaExecutorCoScheduler that maintains a list of blocked tasks and uses a Java Executor to manage runnable tasks.

The updated complete source code for this post is available on github in my nioserver project under the tag blog-executor.

Managing Tasks

As mentioned above, we need to deal with two kinds of tasks: tasks that are ready to run and tasks that are blocked. The standard Executor class allows us to submit a task for execution, but does not handle blocked tasks. Since we don't want to submit blocked tasks to the Executor, we have to queue them up ourselves. We have two issues to attend to:
  1. When our scheduler is passed a task, we must put it into our own queue of blocked tasks if it is not currently ready to run.
  2. When a previously blocked task becomes ready to run, we must remove it from our queue of blocked tasks and pass it to the Executor.
The first issue is straightforward, as our framework already allows us to test the blocker for a task and see if the task is ready to run. In order to properly take care of the second issue, we will make a small change to our framework to allow us to notice when a blocker has probably stopped blocking so that we can run the corresponding task. We do this by modifying our CoScheduler class to add a method to notify it that a blocker has probably become unblocked:
    def unblocked(b:Blocker):Unit
We call this method from CoQueue in the two places where we previously called scheduler.coNotify: in the blockingEnqueue method after we have enqueued an item to notify the scheduler that the dequeue side is probably unblocked, and in the blockingDequeue method after we have dequeued an item to notify the scheduler that the enqueue side is probably unblocked. Those two methods in CoQueue now look like this:
    def blockingEnqueue(x:A):Unit @suspendable = {
        enqueueBlocker.waitUntilNotBlocked
        enqueue(x)
        scheduler.unblocked(dequeueBlocker)
    }

    def blockingDequeue():A @suspendable = {
        dequeueBlocker.waitUntilNotBlocked
        val x = dequeue
        scheduler.unblocked(enqueueBlocker)
        x
    }
The implementation of unblocked in our default scheduler DefaultCoScheduler is just a call to coNotify, so the behavior of that system will remain the same as it was before we added the calls to unblocked.

Because we need to ensure that all of our NIO read and write operations are handled sequentially, we continue to manage those tasks separately with our NioSelector class, where all of the reads are executed on one thread and all of the writes are executed on another thread.

Scheduler

We already have a scheduler framework that defines a CoScheduler class as the parent class for our scheduler implementations, which requires that we implement the methods setRoutineContinuation, runNextUnblockedRoutine and the newly added unblocked.

In our JavaExecutorCoSchduler, our setRoutineContinuation method is responsible for storing or executing the task. It checks to see if the task is currently blocked, storing it in our list of blocked tasks if so. Otherwise, it passes it to the thread pool (which is managed by an ExecutorService), which takes care of managing the threads and running the task. We define a simple case class, RunnableCont, to turn our task into a Runnable that is usable by the pool.

Our unblocked method gets passed a blocker which is probably now unblocked. We test that, and if in fact it is still blocked we do nothing. If it is unblocked, then we remove it from our list of blocked tasks and pass it to the pool.

The runNextUnblockedRoutine method in this scheduler doesn't actually do anything, since the pool is taking care of running everything. We just return SomeRoutinesBlocked so that the caller goes into a wait state.

In addition to the above three methods, we will have our thread pool, a lock that we use when managing our blocked and runnable tasks, and a set of blocked tasks waiting to become unblocked. For this implementation we choose to use a thread pool of a fixed size, thus the call to Executors.newFixedThreadPool.

Here is our complete JavaExecutorCoScheduler class:
package net.jimmc.scoroutine

import java.lang.Runnable
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService

import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.SynchronizedMap

class JavaExecutorCoScheduler(numWorkers:Int) extends CoScheduler {
    type Task = Option[Unit=>Unit]
    case class RunnableCont(task:Task) extends Runnable {
        def run() = task foreach { _() }
    }

    private val pool = Executors.newFixedThreadPool(numWorkers)
    private val lock = new java.lang.Object
    private val blockedTasks = new LinkedHashMap[Blocker,Task] with
            SynchronizedMap[Blocker,Task]

    private[scoroutine] def setRoutineContinuation(b:Blocker,task:Task) {
        lock.synchronized {
            if (b.isBlocked) {
                blockedTasks(b) = task
            } else {
                pool.execute(RunnableCont(task))
                coNotify
            }
        }
    }

    def unblocked(b:Blocker):Unit = {
        lock.synchronized {
            if (!b.isBlocked)
                blockedTasks.remove(b) foreach { task =>
                    pool.execute(RunnableCont(task)) }
        }
        coNotify
    }

    def runNextUnblockedRoutine():RunStatus = SomeRoutinesBlocked
}

Synchronization

Although not necessitated by the above changes, I added one more change to CoScheduler to improve its synchronization behavior.

While exploring various multi-threading mechanisms as alternatives to using Executor, I wrote a scheduler called MultiThreadCoScheduler in which I implemented my own thread pool and in which the master thread directly allocated tasks to the worker threads in the pool. Although that scheduler was quite a bit larger than the one presented above, it provided much more control over the threads, allowing me to change the number of worker threads on the fly and to be able to tell in my master thread whether there were any running worker threads.

In MultiThreadCoScheduler, the main thread would call coWait to wait until it needed to wake up and hand out another task, and the worker threads would call coNotify when they were done processing a task and were ready to be assigned the next task. Similarly, a call to coNotify would be issued whenever a new task was placed into the task queue.

Unfortunately, Java's wait and notify methods, which are the calls underlying our coWait and coNotify methods, do not quite behave the way we would like. If we compare those calls to the Java NIO select and wakeup calls, we note that if a call is made to wakeup before a call to select, the select call will return immediately. The wait/notify calls do not behave this way; if a call is made to notify when there is no thread waiting in a wait call on that monitor, the notify call does nothing, and the following call to wait will wait until the next call to notify.

This small difference in semantics actually makes a pretty big difference in behavior, because it means when using wait and notify you must be concerned with which happens first. Let's see how that works.

In a typical scenario we have a resource with a boolean state that indicates when a thread can access that resource, for example, a queue with a boolean state of "has some data" that indicates when a reader thread can pull an item from the queue (and perhaps another boolean state of "queue is full" that indicates when a writer thread can put an item into the queue). In the case of MultiThreadCoScheduler we have a task with a "ready" flag that tells us when we can assign that task to a worker, and a worker with an "idle" flag that tells us when we can assign a task to that worker. When a task becomes ready to run, we want a thread (other than the master, since it may be waiting) to add the task to our queue of tasks and then notify the master that a task is available. Meanwhile, when the master is looking for an available task to assign to an idle worker, it will query to see if a task is available, and if not it will then wait until one becomes available. The problem sequence would be if the master checks for available tasks, finds none, then before the master executes its wait, the non-master puts a ready task into the queue and issues a notify to the master. The result of this sequence would be a ready task in the queue, but a master waiting for a notify.

When all of the synchronization is done within a single class, you can ensure that the above problem sequencing of operations does not happen by arranging that the code that places a ready task into the queue and notifies the master happens within one synchronized block, and the code used by the master to query the queue for a ready task and then to wait happens within one synchronized block on the same monitor. But when dealing with subclasses, we run into the "inheritance anomaly" (or "inheritance-synchronization anomaly"). The essence of this problem is that the base class provides a method that is synchronized, but the subclass would like to include more functionality within that synchronized block. If, as is often the case, the subclass does not have access to the monitor being used by the base class to control its synchronization, there is no way for it to do this.

In our case, we can implement something that is sufficient for our current needs by making a small change to our coWait and coNotify methods in CoScheduler so that they behave in the same manner as select and wakeup: if a call to coNotify is made before a call to coWait, the call to coWait will return immediately. We do this by changing the implementation of coWait and coNotify in CoScheduler from this:
    def coWait():Unit = {
        defaultLock.synchronized {
            defaultLock.wait()
        }
    }

    def coNotify():Unit = {
        defaultLock.synchronized {
            defaultLock.notify
        }
    }
to this:
    private var notified = false
    def coWait():Unit = {
        defaultLock.synchronized {
            if (!notified)
                defaultLock.wait()
            notified = false
        }
    }

    def coNotify():Unit = {
        defaultLock.synchronized {
            notified = true
            defaultLock.notify
        }
    }
With the above change to our base class, our subclass no longer needs to be concerned about the problem sequence described above, because the call to coWait will return immediately if there was a call to coNotify since the most recent previous call to coWait.

2 comments:

AndiHofi said...

Because of the possibility of spurious wakeups you should always use while loops when using wait/notify:

defaultLock.synchronized {
while(!notified) defaultLock.wait()
notified = false
}

Jim McBeath said...

AndiHofi: Yes, in general that is a good rule. Since in this case coWait is a version of wait, I expected the caller to determine whether he considers the wakeup spurious and re-invoke coWait, so I had not bothered to make that check myself within coWait.