A generic Scala generator class built directly on Scala's shift and reset operators.
In my
previous post
I showed a
generic generator class built on top of my
coroutines library.
I commented in that post that I was taking the expedient approach of
using my existing library, but that it would be possible to package
all of the functionality into the Generator class.
I soon realized that it would in fact be relatively easy to do that packaging,
and that the resulting relatively simple class
would probably make a good vehicle for demonstrating
the usefulness of Scala's
delimited continuations.
Thus I give you the
StandaloneGenerator class:
package net.jimmc.scoroutine
import scala.collection.Iterator
import scala.util.continuations._
class StandaloneGenerator[T] extends Iterator[T] {
private var nextValue:Option[T] = None
private var nextStep:Option[Unit=>Unit] = None
/** Subclass calls this method to generate values.
* @param body The code for your generator.
*/
protected def generate(body: => Unit @suspendable) {
reset {
suspend
body
}
}
/** Yield the next generated value.
* Call this code from your generator to deliver the next value.
*/
protected def yld(x:T):Unit @suspendable = {
nextValue = Some(x)
suspend
}
/** Retrieve the next generated value.
* Call this from your main code.
*/
def next:T = {
step
nextValue match {
case None => throw new NoSuchElementException("next on empty generator")
//make it similar to the equivalent Iterator exception
case Some(x) => nextValue = None; x
}
}
/** True if there is another value to retrieve.
* Call this from your main code.
*/
def hasNext:Boolean = {
step
nextValue.isDefined
}
/** Save our continuation to resume later. */
private def suspend:Unit @suspendable = {
shift { k:(Unit=>Unit) =>
nextStep = Some(k)
}
}
/** If we have a next step but we don't have a value queued up,
* run the generator until it calls yld or completes. */
private def step = {
if (nextValue.isEmpty) nextStep foreach { nextStep = None; _() }
}
}
The StandaloneGenerator class is a plug-compatible replacement
for the Generator class described in my previous post,
as long as the derived generator does not use fancy scheduling as in the
Primes Generator example.
So, for example, you could take the
Integers Generator example from that post,
replace the two occurences of Generator with
StandaloneGenerator, and everything would work the same way.
We have two variables:
nextValue is our one-element queue where we store the
next value to be returned by the generator,
and nextStep is our "scheduler queue"
where we store our continuation each
time we suspend the generator to return a value.
Both of these variables are of type Option so that we can
tell when they hold a value.
Control is managed by the two functions suspend
and step.
The suspend method has a shift block that
could not be much simpler: all it does is store the passed-in
continuation in the nextStep variable.
Since the body of a shift block is always the last thing
executed within the scope of the CPS code (delimited either by the
enclosing reset block or an explicitly invoked continuation),
the fact that suspend does not execute its continuation means
that after suspend does its thing,
control returns to the point just past the enclosing reset,
or to the next statement after the explicit continuation call.
I considered calling the step method resume
instead, to make it clear that it was the complement to suspend,
but from the point of view of the code calling step,
by the time control returns to the point after the call to step,
the generator code has already been suspended again:
it has completed running one step, which is from one yld
call to the next.
The step function executes our continuation
if we have one, but only
if we don't already have a value in nextValue.
Using foreach on an Option is a neat way
to execute a block of code only if the Option has a value
(i.e. is not None).
In this case, since the contents of nextStep (if it has any)
is a function, the placeholder variable _ gets set to
the continuation and
the code fragment _() is what actually
executes the continuation.
Here are two other ways this function could be implemented that
do the same thing:
private def step1 = {
if (nextValue.isEmpty) nextStep match {
case None => //do nothing
case Some(p) => nextStep = None; p()
}
}
private def step2 = {
if (nextValue.isEmpty && nextStep.isDefined) {
val p = nextStep.get; nextStep = None; p()
}
}
Let's walk through this and see how it works
(assuming the generator generates at least one value):
The main code instantiates a generator, which passes its generator body
to the generate function,
which calls suspend,
which saves that body in
nextStep without executing it.
The main code calls hasNext, which calls step,
which runs the continuation.
This starts the generator body code running, which runs until it
calls yld with the first value.
That first value is stored in nextValue and
the generator code is suspended, with the continuation stored
in nextStep.
Since we now have a value stored in nextValue,
hasNext returns true.
The main code calls next.
Since we have a value in nextValue, the call to
step does nothing (it is only there in case the caller
calls next without first calling hasNext).
The value in nextValue is returned, and that variable
is cleared (set to None).
Each time the main code calls hasNext
and it calls step, the continuation
stored in nextStep is executed,
the generator code calls yld with the next value,
that value is stored into nextValue,
the continuation is stored in nextStep,
and the generator is suspended,
and hasNext returns true.
Eventually (for a finite generator) there is a call to step
where the generator finishes execution without calling
yld.
When this happens, no values are stored in either
nextValue or nextStep.
Since there is no value in nextValue,
hasNext returns false.
Since there is also no value in nextStep,
further calls to step do nothing, and
hasNext will continue to return false.
I dare say this might be one of the simplest realistic examples of
the use of Scala's reset and shift operators
that you will find.
Two variables, six functions each not more than four lines of body code,
16 lines of function body code in total,
35 lines of code altogether.
An implementation of generators on top of
Scala's delimited continuations.
In my
previous post
I described a library that supports coroutines on top of Scala's
delimited continuations
capability.
In this post I show how you can easily create generators on top of
that
coroutine library (net.jimmc.scoroutine).
This is a second example of the kind of interesting construct
that can be built on top of Scala's delimited continuations.
As with my previous post on coroutines,
you don't need to understand reset and shift
if you just want to use the Generator class shown here
to write and use your own generators.
But, as with coroutines,
you should have a basic understanding of CPS code and its
restrictions
when writing generators.
A
generator
is a routine that produces values like an iterator but is structured
as a function.
The generated values are returned by calling a special function,
typically called yield, with each value that is generated.
In our case, since yield is a reserved word in Scala,
we will use yld instead.
Generators and coroutines are
closely related.
Depending on the implementation, generators and coroutines may be
almost the same thing or fairly different,
but in any case,
if you have either one, you can implement the other one on top of it.
Since we already have coroutines
in the net.jimmc.scoroutine library
described in my
previous post,
we will implement generators on top of coroutines using that library.
You can think of this approach as using the Producer-Consumer pattern,
where we set up a generator as the producer and we allow the main
code to act as the consumer.
We create a generic Generator class that does the following:
Creates a CoScheduler that we use to control
the generator.
Creates a CoQueue buffer into which we will place
the generated values.
Provides convenience functions
yld (in place of the reserved word yield)
and generate.
Provides next and hasNext functions for
the consuming code to call from a non-CPS context,
and so that a Generator can be used as an
Iterator.
This is all simple and straightforward.
Here is the code for
Generator:
package net.jimmc.scoroutine
import scala.collection.Iterator
import scala.util.continuations._
/** Generic generator class.
*/
class Generator[T] extends Iterator[T] {
val sched = new DefaultCoScheduler
val buf = new CoQueue[T](sched,1)
/** Subclass calls this method to generate values.
* @param body The code for your generator.
*/
def generate(body: => Unit @suspendable) {
sched.addRoutine("gen") { body }
sched.run
}
/** Yield the next generated value.
* Call this code from your generator to deliver the next value.
*/
protected def yld(x:T):Unit @suspendable = {
buf.blockingEnqueue(x)
}
/** Retrieve the next generated value.
* Call this from your main code.
*/
def next:T = {
sched.run
buf.dequeue
}
/** True if there is another value to retrieve.
* Call this from your main code.
*/
def hasNext:Boolean = {
sched.run
!buf.dequeueBlocker.isBlocked
}
}
We are not concerning ourselves with performance here, so we are simply
using the available DefaultCoScheduler as our scheduler.
As a future optimization, we could develop a scheduler optimized for
a single coroutine and use that as our scheduler for simple generators
that fit that criterion.
We could go further and use neither a scheduler nor CoQueue,
packaging all of the functionality directly into the Generator
class; but we are using the more expedient approach of using those two
pieces, since we already have them and are familiar with their use from
our experience with coroutines.
Integers Generator
Here is how we would use our generic Generator class to create
a generator that will generate integers up to a specified maximum value:
import net.jimmc.scoroutine.Generator
class IntGen(max:Int) extends Generator[Int] {
generate {
var x = 1
while (x<=max) {
yld(x)
x = x + 1
}
}
}
The one catch to remember here is that the body of the generate
call is CPS code, so as with the body of a coroutine,
there are some restrictions
on what control constructs we can use.
Thus we use a while loop with a var
rather than a for loop, since the latter does not
yet work with the continuations compiler plugin.
Given the above generator class, here is a simple
GenInts
object with a main function
that creates an instance of that generator,
then calls it to print out its values:
object GenInts {
def main(args:Array[String]) = {
val gen = new IntGen(4)
for (i <- gen)
println(i)
}
}
Alternatively, we could replace the for loop with direct
calls to hasNext and next:
object GenInts {
def main(args:Array[String]) = {
val gen = new IntGen(4)
while (gen.hasNext)
println(gen.next)
}
}
Primes Generator
It is
possible to
useshift and reset directly
to code up a generator,
but because our coroutine library implements a scheduler
to which new coroutines can be added at any time,
this gives you the ability to create generators that include
dynamic filter pipelines.
The example I use for this is the
Sieve of Eratosthenes,
a method of calculating primes in which,
each time a prime is found, it is added to a list of prime divisors
that are used for testing each new candidate.
In this
GenPrimes
example, I create a new filter for each prime and add it to the pipeline.
You can do this much more efficiently in Scala
using a Stream,
but this example illustrates the technique of dynamically building a pipeline
within a generator.
import scala.util.continuations._
import net.jimmc.scoroutine.CoQueue
import net.jimmc.scoroutine.Generator
object GenPrimes {
def main(args:Array[String]) = {
val gen = new PrimeGen()
for (i <- gen) {
println("Prime: "+i)
}
}
}
class PrimeGen extends Generator[Int] {
val bufSize = 1
val out1 = new CoQueue[Int](sched,bufSize)
sched.addRoutine("prime2")(nextPrime(2,out1))
generate {
def gen(n:Int):Unit @suspendable = {
out1.blockingEnqueue(n)
gen(n+1)
}
gen(2)
}
def nextPrime(p:Int, in:CoQueue[Int]):Unit @suspendable = {
var out:Option[CoQueue[Int]] = None
yld(p)
def sieve():Unit @suspendable = {
val n = in.blockingDequeue()
if ((n%p)!=0) {
if (!out.isDefined) {
out = Some(new CoQueue[Int](sched,bufSize))
val rName = "prime"+n
sched.addRoutine(rName)(nextPrime(n,out.get))
}
out.get.blockingEnqueue(n)
} else {
in.dequeueBlocker.waitUntilNotBlocked
}
sieve()
}
sieve()
}
}
This example starts by setting up two coroutines:
the addRoutine call sets up the first filter in the pipeline,
which reads values from the out1 queue and
filters our all numbers divisible by 2.
The generator call sets up the other initial coroutine,
which generates every integer and feeds it into the first filter in
the pipeline.
We start off this counting generator with the first prime number, 2.
The nextPrime function is called each time we see a new prime.
It starts by outputting its prime parameter value p
as a value of the GenPrimes generator.
It then goes into a loop reading its input buffer and looking for values
which are not divisible by its prime number.
The first time it finds one (when out is not yet defined)
it registers (with a call to addRoutine) a new coroutine
based on a new instance of nextPrime that uses our output
as its input.
It then passes each candidate prime along to that next filter in
the sieve pipeline.
You can tell this is CPS code because of the suspendable
annotations, which is a cue to realizing
that the code might not behave quite as you think.
For example, the gen function within the body of the
generate call is recursive, so you might think it would
cause a stack overflow.
But since we are in a CPS function and the call to blockingEnqueue
is a call to a CPS function, the recursive call to gen
is turned into a continuation and executed later from the scheduler,
so it is in fact not recursive.
Likewise the recursive call to sieve is not really
recursive for the same reason.
Another CPS detail is the call to waitUntilNotBlocked.
It would seem to be functionally unnecessary, since the first thing
in the sieve function is a call to blockingDequeue.
However, this is the same attempt to avoid blocking as discussed in my
previous post;
without this call our code will not work.
Same Fringe
The
SameFringe
problem has
been called the "killer application" for coroutines.
Given two trees, they have the same fringe if the leaves of the two
trees, read from left to right, are the same.
With coroutines, or in this case generators,
the simple solution to this problem
is to create a generator that takes a tree and returns the sequence
of leaves of that tree,
then compare the outputs of two of those generators on the two trees
to be compared.
We start with a simple tree definition:
sealed abstract class Tree[T]
case class Branch[T](left:Tree[T], right:Tree[T]) extends Tree[T]
case class Leaf[T](x:T) extends Tree[T]
Given this tree definition, we write a generator that walks a tree
and yields all of the leaves:
import scala.util.continuations._
import net.jimmc.scoroutine.Generator
class TreeFringe[T](tree:Tree[T]) extends Generator[T] {
generate {
def walk(t:Tree[T]):Unit @suspendable = {
t match {
case Leaf(x) => yld(x)
case Branch(left,right) => walk(left); walk(right)
}
}
walk(tree)
}
}
Since our generators implement the
Iterator trait, we can compare
two generators as two iterators with this little piece of code,
making the assumption that the tree leaf values are never null:
Alternatively, we could use this more verbose version:
def sameFringe[T](tree1:Tree[T], tree2:Tree[T]):Boolean = {
val fringe1 = new TreeFringe(tree1)
val fringe2 = new TreeFringe(tree2)
while(fringe1.hasNext && fringe2.hasNext) {
if (fringe1.next != fringe2.next)
return false;
}
!(fringe1.hasNext || fringe2.hasNext)
}
We add a
SameFringe object with a
main method that creates some test trees, prints out
the leaves of each tree using our generator, then calls our
sameFringe method to check for equality.
object SameFringe {
def main(args:Array[String]) = {
val t1 = Branch(Branch(Leaf(1),Leaf(2)),Leaf(3))
val t2 = Branch(Leaf(1),Branch(Leaf(2),Leaf(3)))
val t3 = Branch(Leaf(1),Branch(Leaf(2),Leaf(4)))
println("t1:"); for (x <- (new TreeFringe(t1))) println(x)
println("t2:"); for (x <- (new TreeFringe(t2))) println(x)
println("t3:"); for (x <- (new TreeFringe(t3))) println(x)
println("sameFringe(t1,t2)="+sameFringe(t1,t2))
println("sameFringe(t1,t3)="+sameFringe(t1,t3))
}
//include the sameFringe method in this object
}
More Possibilities
Some other possible uses for generators or coroutines:
Pipelines: A sequence of tasks can operate on a stream of data,
with each task reading data from an input queue and writing to an
output queue which is the input queue of the next task in the sequence.
Fan-out: A single producer with multiple consumers can be implemented
by using a fan-out coroutine that reads from its input queue and writes
the same data to multiple output queues, each of which is the input
queue to one of the multiple consumers.
Fan-in: Multiple producers can use a single shared output queue so that the
coroutine using that queue as its input queue receives data from
multiple sources. If you stick with a single-thread scheduler,
you don't have to worry about synchronization or other concurrent
access issues on the shared queue.
By combining Pipelines, Fan-out and Fan-in, we can create arbitrary
networks of communicating coroutines.
State machines: For any situation in which a task has to maintain state
based on one or more inputs, a coroutine or generator can be used to
allow some of that state to be stored as the location of current
execution in the code, which often makes the code simpler to write
and maintain.
Parsers: A parser is a typical example of a producer that reads an
input stream and maintains state. As the parser collects input characters
(which could be provided by another coroutine in a pipeline)
and resolves them into tokens, it writes them to its output queue
where the tokens are available to the routine handling the next level of
analysis.
An implementation of coroutines on top of
Scala's delimited continuations.
In my
previous post
I said that delimited continuations could be used to create
interesting control constructs.
In this post I give examples of one such construct: coroutines.
I describe the implementation of a library to make it easier to
write coroutines, and I give an example that is built on that library.
If you want to go straight to the example, see the section
Producer-Consumer.
Like my previous post on
delimited continuations,
this is a long post.
However, though long, it should be much easier going than that post.
You don't need to read and understand that post in order to understand
this post, but it will help in two places:
The
Wikipdeia coroutines page
says coroutines are a generalization of subroutines:
whereas a subroutine starts at the beginning every time it is called,
a coroutine starts at the beginning the first time it is called,
but can start at any other point on successive calls.
Generally this means that on successive calls, it continues running
from the point where it most recently returned.
Instead of using a return statement,
coroutines typically use a yield statement.
A yield statement
indicates that the routine is done executing for now,
and will resume execution following the yield statement
the next time it is called.
In the classic definition of coroutines,
the yield statement indicates which coroutine is to be run next.
With two coroutines, each always yields to the other;
with more than two, a coroutine might have code to determine which
other coroutine to yield to.
Unfortunately for coroutines, yield
is already a keyword in Scala, so we can't use it for our purposes.
We can either pick a slightly different word such as
yld or yieldTo,
or we can just use a different term altogether.
The classic example of the use of coroutines is a pair of routines,
one of which (the producer) generates data and
one of which (the consumer) consumes data.
These two routines are connected by a queue; the producer puts data
into the queue and the consumer takes data out of the queue.
This same producer-consumer pair is also a typical example
of multi-thread code.
Ted Neward uses this producer-consumer example in his
blog post describing the concurrency benefits of using Scala.
In the multi-thread producer-consumer example,
the producer thread runs and places data into the queue
until the queue is full,
at which point that thread stops running until the queue empties out enough
for it to add more data.
Meanwhile the consumer thread runs and takes data out of the queue
until the queue is empty,
at which point that thread stop running until the queue contains more data.
If the host on which the multi-thread application is running contains
more than one processor, both of these threads might be running
at the same time.
I like to think of coroutines as being like multi-thread code,
only without multiple threads.
The coroutine version of the producer-consumer example
works essentially the same as the multi-thread version,
except that only one of the two is ever running at one point in time.
The producer runs and places data into the queue until it is full,
at which point it pauses and the consumer starts running.
The consumer takes data out of the queue until it is empty,
at which point it pauses and the producer starts running again.
In both the multi-thread and the coroutine version of this example,
there is some state that is saved while each routine is paused
waiting for the queue to fill or empty.
In the multi-thread example, that state is saved in the thread.
In our coroutine example, we use a different mechanism to save
that state: a delimited continuation.
Why Use Coroutines
If coroutines are like multi-thread code, why use them and have to
deal with continuations rather than just using threads?
Here are some possible reasons:
With the
default scheduler
that runs everything in one thread,
you don't need to worry about concurrency issues such as
multiple concurrent access to shared state.
You can create your own scheduler to control when to run each of your
coroutines. If you want that scheduler to use a thread pool and
run coroutines concurrently, you can do that (assuming you then
deal with concurrency issues in your coroutines).
An application can handle more suspended continuations than it can
suspended threads (for example, see slide 19 of Phillip Haller's
presentation
on Actors, where he says an app can handle up to 5,000
threads, but up to 1,200,000 actors).
Building a Coroutine Library
It is
possible to write coroutines directly in Scala code using
reset and shift, but
dealing with delimited continuations can be tricky,
so I wanted to isolate all of that code into a reusable library
that would make it easier to write coroutines
as well as allow encapsulating more sophisticated
capabilities within the library.
The package name I selected for this library is
net.jimmc.scoroutine.
The source code is
available on github.
I started with a change that makes these coroutines look less like
coroutines and more like the multi-thread model:
rather than have a coroutine specify what other coroutine is to be run,
I wanted to be able to specify only that the coroutine is ready to give
up control.
Essentially, rather than yielding to another coroutine,
I always yield to a scheduler,
and the scheduler selects and then yields to the next coroutine.
Given such a scheduler (described below),
we can create a few simple constructs on which to build our
coroutine API.
Blocker
In the typical producer-consumer example,
there is an explicit check to see if the routine is blocked,
and if so, then a call to yield is made.
I wanted something more generic, so I created an abstract trait
Blocker
to represent the condition that a routine could be blocked by something:
The implementations of this for the producer and consumer are straightforward:
for the producer, isBlocked returns true when the
queue is full;
for the consumer, isBlocked returns true when
the queue is empty.
Given the isBlocked method, the typical coroutine always
includes a code fragment that looks something like this:
while (isBlocked)
yield control
Since we will always be yielding control to the scheduler, we can encapsulate
this into a more convenient method, which I have called
waitUntilNotBlocked.
I added this function to my Blocker trait, and delegated it to
a scheduler of type
CoScheduler:
package net.jimmc.scoroutine
trait Blocker {
val scheduler: CoScheduler //class must override to provide instance
def isBlocked: Boolean
def waitUntilNotBlocked:Unit = {
scheduler.waitUntilNotBlocked(this)
}
}
We pass this to the scheduler so that it can call our
isBlocked method and continue our execution only
when isBlocked returns false.
There is one more detail to be added to Blocker,
but it is an important one.
I stated above that this implementation is built on
top of delimited continuations.
When we call waitUntilNotBlocked
and we are blocked, we want the coroutine library to wait until we
are no longer blocked and then continue execution of our routine.
The coroutine library will be using delimited continuations to do this,
and since our routine might be suspended, the signature
for the waitUntilNotBlocked method must
include the
suspendable annotation.
We add that annotation along with a suitable import statement
to get our final version of Blocker:
package net.jimmc.scoroutine
import scala.util.continuations._
trait Blocker {
val scheduler: CoScheduler //class must override to provide instance
def isBlocked: Boolean
def waitUntilNotBlocked:Unit @suspendable = {
scheduler.waitUntilNotBlocked(this)
}
}
CoQueue
Once we have Blocker, it is easy to compose a blocking
version of the standard
Queue class
(where by "blocking" I mean that the coroutine will be suspended until
the specified Blocker is no longer blocked).
We want a version of the
Queue.enqueue function
that blocks when the queue is full,
and a version of the
Queue.dequeue function
that blocks when the queue is empty.
We create a thin wrapper around the Queue class,
which we call CoQueue,
to implement our blocking methods for use with our coroutines.
For each of our two blocking conditions,
we create an instance of Blocker
with those conditions as each of their isBlocked functions,
and we use those Blocker instances to create our
blockingEnqueue and blockingDequeue methods.
Because the blockingEnqueue
and blockingDequeue methods might block,
they must be annotated as suspendable,
which means they can only be called from within CPS code.
Here is our entire
CoQueue
class:
package net.jimmc.scoroutine
import scala.util.continuations._
import scala.collection.mutable.Queue
class CoQueue[A](val scheduler:CoScheduler, val waitSize:Int)
extends Queue[A] { coqueue =>
val enqueueBlocker = new Blocker() {
val scheduler = coqueue.scheduler
def isBlocked() = length >= waitSize
}
val dequeueBlocker = new Blocker() {
val scheduler = coqueue.scheduler
def isBlocked() = isEmpty
}
def blockingEnqueue(x:A):Unit @suspendable = {
enqueueBlocker.waitUntilNotBlocked
enqueue(x)
}
def blockingDequeue():A @suspendable = {
dequeueBlocker.waitUntilNotBlocked
dequeue
}
}
An Attempt to Avoid Blocking
You might wonder if we could implement blockingEnqueue
as follows so as to avoid the call to the scheduler's
waitUntilNotBlocked method:
CoQueue.scala:22: error: type mismatch;
found : Unit
required: Unit @scala.util.continuations.cpsParam[Unit,Unit]
if (enqueueBlocker.isBlocked)
^
Syntactically, the problem is that the above one-sided
if statement is equivalent to
if (enqueueBlocker.isBlocked)
enqueueBlocker.waitUntilNotBlocked
else
()
The type of () is Unit, but the type of
waitUntilNotBlocked is Unit @suspendable,
so there is a type mismatch.
You might think we could use some trickery such as this:
//compiles, but won't run properly
def unitSuspendable:Unit @suspendable = ()
def blockingEnqueue(x:A):Unit @suspendable = {
if (enqueueBlocker.isBlocked)
enqueueBlocker.waitUntilNotBlocked
else
unitSuspendable
enqueue(x)
}
This will compile without errors, but will not work properly.
The problem is that we are trying to define one code path that
is CPS (through waitUntilNotBlocked) and one path that
is not (through unitSuspendable).
The CPS compiler plugin transforms the code to use continuations,
which, as you might recall from the discussion in my previous post,
packages the rest of the function up in a continuation and passes it
along to the called function.
But the unitSuspendable expression does nothing with that
continuation; it neither executes it nor saves it for later execution.
Thus as soon as this code path is taken, the continuation -
which represents
the remainder of the execution of the entire delimited continuation,
including the caller of this function -
is dropped, and the whole delimited continuation is done.
CoScheduler API
The scheduler is the piece of the coroutine library that saves the
continuations for all of the participating coroutines and determines
when to execute those continuations.
By design, all of the tricky stuff is encapsulated in this class.
I have divided the discussion of the scheduler into two section:
this first section discusses at a high level the tasks for which the
schedule must take responsibility, and defines an API to perform
those tasks.
The following section describes the implementation of that API.
If you don't want to get too far into the delimited continuation stuff,
you can read just this section and skip the implementation section.
I started with the requirements that it should be possible to
write a scheduler that:
uses one thread for all participating coroutines, or uses
a thread pool so that multiple coroutines can run at once;
uses a simple algorithm
to select which coroutine to run next, such as round-robin,
or a more complex algorithm, such as a priority-based approach;
can be instantiated multiple times
so that different collections of coroutines can be managed with
different schedulers.
In order to allow all of the above,
the scheduler API is defined in a trait,
which I call CoScheduler.
CoScheduler needs to define three basic functions:
Register a coroutine.
I chose to do this with a method that accepts a block of code which
is the coroutine body.
Since the coroutine body might be suspended, the signature for that
argument must include the suspendable annotation.
I call this method addRoutine.
To improve tracing, I also pass in a name argument that can be
used to identify that coroutine.
Wait until a coroutine is no longer blocked.
This is the method to which Blocker.waitUntilNotBlocked
will delegate.
It is also called waitUntilNotBlocked, and takes an
argument which is the Blocker whose
isBlocked method will be used to make that determination.
Since this method will be called from the coroutine body or a method
called from that body, it must be marked as suspendable.
Run a coroutine.
There are two flavors of this: run a single step, returning as soon as
one coroutine has run and has returned control to the scheduler,
or run as many steps as possible,
until there are no more unblocked coroutines to run.
I call these two methods
runNextUnblockedRoutine and runUntilBlockedOrDone,
respectively.
Since these methods are meant to be called from the main application,
not from within a coroutine, they are not marked as
suspendable.
When one of the run methods returns,
we would like to know whether
there are some blocked coroutines,
all of our coroutines have completed,
or we don't know which because we only ran one routine.
In order to return this indication, we define a sealed class
RunStatus with three case objects representing
these three possibilities.
The API of our
CoScheduler
trait thus looks like this:
package net.jimmc.scoroutine
sealed abstract class RunStatus
case object RanOneRoutine extends RunStatus
case object SomeRoutinesBlocked extends RunStatus
case object AllRoutinesDone extends RunStatus
trait CoScheduler {
def addRoutine(name:String)(body: => Unit @suspendable)
def waitUntilNotBlocked(b:Blocker):Unit @suspendable
def runNextUnblockedRoutine():RunStatus
def runUntilBlockedOrDone():RunStatus
}
If you are interested in using the scoroutine library
to write coroutines, but you don't care how it works internally,
then you can skip the next few sections and pick up again at
Producer-Consumer.
CoScheduler Implementation
The reason for implementing CoScheduler as a trait rather
than a class is to make it easier to create multiple implementations.
On the other hand, there is some functionality which is likely to be
the same in all implementations, and since Scala allows us to include
code in traits, we will add the implementation of those methods to the trait.
For example, given the runNextUnblockedRoutine method,
the runUntilBlockedOrDone method is a simple loop that
calls runNextUnblockedRoutine until it does not run something.
Likewise, if we internally create an instance of a Blocker
in addRoutine,
we can implement both that method and waitUntilNotBlocked
on top of a single method that stores a continuation,
which we will call setRoutineContinuation.
This means we can create a concrete scheduler class that extends the
CoScheduler trait by implementing just two functions:
runNextUnblockedRoutine and
setRoutineContinuation.
Below is what our implementation of CoScheduler looks like
after making the above changes.
package net.jimmc.scoroutine
import scala.util.continuations._
sealed class RunStatus
case object RanOneRoutine extends RunStatus
case object SomeRoutinesBlocked extends RunStatus
case object AllRoutinesDone extends RunStatus
trait CoScheduler { cosched =>
private[scoroutine] def setRoutineContinuation(
b:Blocker, cont:Option[Unit=>Unit]):Unit
def runNextUnblockedRoutine():RunStatus
/* We use a class rather than an object because we are using the
* instance as a key to find more info about the associated routine. */
class BlockerNever() extends Blocker {
val scheduler = cosched
val isBlocked = false
}
def addRoutine(name:String)(body: => Unit @suspendable) {
reset {
val blocker = new BlockerNever()
waitUntilNotBlocked(blocker)
body
}
}
def runUntilBlockedOrDone():RunStatus = {
var status:RunStatus = RanOneRoutine
while (status==RanOneRoutine) {
status = runNextUnblockedRoutine()
}
status
}
def waitUntilNotBlocked(b:Blocker):Unit @suspendable = {
shift( (cont: Unit=>Unit) => {
setRoutineContinuation(b,Some(cont))
})
}
}
As we already knew based on the existence of the
suspendable annotation, there are two methods that deal
with CPS code: addRoutine and waitUntilNotBlocked.
Before examining these functions, it is worth reviewing one point about
how reset and shift work.
When there is a shift call within a reset block,
the CPS compiler plugin transforms the code within the reset
block such that the code after the shift block is passed
as a continuation argument to the shift block.
The code within the shift block is thus the last code
to be executed within the reset block.
The code that is within the reset but outside of the
shift is CPS code, but the code within the shift
block is not CPS code.
In other words, once within a reset block, we are executing
CPS code until we get into the shift block, at which point
we are no longer executing CPS code.
The continuation contains CPS code; if we call the continuation from
within our shift code, we transition from non-CPS code
into CPS code.
But if we happen to save the continuation,
we can directly execute it later from any non-CPS code, and that will likewise
transition us into the CPS code of the continuation.
The waitUntilNotBlocked function takes as an argument the
Blocker that will tell us when the calling coroutine is
allowed to run again, and, along with the continuation passed in to
the shift, passes it to
setRoutineContinuation.
That function saves the continuation and its associated Blocker
and returns without executing the continuation.
Because we are returning without executing the continuation,
control returns to the first statement past the end of the
enclosing reset block,
or, if the current code/continuation (i.e. the CPS code containing the call to
waitUntilNotBlocked) was directly executed from non-CPS code,
to the statement after
the point at which that continuation was executed.
The addRoutine function creates a Blocker that
never blocks, then calls waitUntilNotBlocked with that
blocker. Since waitUntilNotBlocked is a CPS function
(marked with the suspendable annotation),
the remainder of the code in the reset block is turned
into a continuation and passed along to waitUntilNotBlocked.
When that method calls shift, the continuation we passed to
waitUntilNotBlocked - i.e. our call to body -
is part of the continuation passed to the shift.
Thus when that method saves the continuation, that saved continuation
includes the call to the coroutine body.
Since the continuation is not immediately executed, control returns
to the end of the reset block, and we return from
addRoutine with our coroutine sitting in the scheduler
ready to start running.
DefaultCoScheduler
Given the CoScheduler trait described above, the only
functionality that remains for our concrete class is
to implement a mechanism for storing continuations
and selecting the next one to run.
The
DefaultCoScheduler
implements a simple round-robin
scheduling mechanism, selecting the next runnable continuation each
time it is invoked.
Note that this implementation has been designed to be simple, but
is not very efficient.
In particular, it will not exhibit good performance when there are a large
number of coroutines of which only a few are runnable at any time.
We define a case class BlockerInfo to tie together a
Blocker and its associated continuation,
an ArrayBuffer to store an ordered set of those,
and a map to find one given a Blocker.
The setRoutineContinuation function adds a new
BlockerInfo to our array if we don't already have one
for the given Blocker, or updates the existing one if we do.
The runNextUnblockedRoutine function does a simple
linear scan through the array of items, starting just past where we left off
the last time, looking for the first unblocked continuation and
running it.
If there were no runnable continuations, we return a status code
without running anything.
package net.jimmc.scoroutine
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
class DefaultCoScheduler extends CoScheduler {
val blockerIndexMap = new HashMap[Blocker,Int]
case class BlockerInfo(val blocker:Blocker, index:Int,
var cont:Option[Unit=>Unit])
val blockerList = new ArrayBuffer[BlockerInfo]
var nextIndex = 0
private[scoroutine] def setRoutineContinuation(
b:Blocker,cont:Option[Unit=>Unit]) {
if (blockerIndexMap.get(b).isEmpty) {
val nextIndex = blockerIndexMap.size
blockerIndexMap.put(b,nextIndex)
blockerList += BlockerInfo(b, nextIndex, cont)
} else {
val n = blockerIndexMap(b)
blockerList(n).cont = cont
}
}
def runNextUnblockedRoutine():RunStatus = {
var blockedCount = 0
for (i <- 0 until blockerList.size) {
val index = (nextIndex + i) % blockerList.size
val bInfo = blockerList(index)
if (bInfo.cont.isDefined && bInfo.blocker.isBlocked) {
blockedCount += 1
}
if (bInfo.cont.isDefined && !bInfo.blocker.isBlocked) {
nextIndex = index + 1
val nextCont = bInfo.cont
bInfo.cont = None
nextCont.get() //run the continuation
return RanOneRoutine
}
}
if (blockedCount > 0) {
SomeRoutinesBlocked
} else {
AllRoutinesDone
}
}
}
Note that DefaultCoScheduler does not import
the continuations package.
It does not use reset, shift,
or any of the CPS annotations such as suspendable.
This is because none of this code is CPS code.
runNextUnblockedRoutine is called from non-CPS code,
and although setRoutineContinuation is called from
CPS code, it does not itself call any CPS functions nor does it use
shift, so it does not need to be declared as CPS code.
Other Schedulers
DefaultCoScheduler implements a basic scheduling algorithm.
It is intended for use with small numbers of coroutines and has not
been optimized.
Other schedulers could be written that are optimized for other
situations, such as large numbers of coroutines,
coroutines with different priorities,
or "stickiness" so that a running coroutine continues to run
until it is blocked before the next coroutine runs.
Since the code that creates the coroutines starts by creating
the scheduler that controls those coroutines,
it would be simple to create a scheduler other than
DefaultCoScheduler for use with a particular
set of coroutines.
Producer-Consumer
Let's see how the Producer-Consumer example
(ProdConTest)
looks using the
scoroutine library:
import scala.util.continuations._
import net.jimmc.scoroutine.DefaultCoScheduler
import net.jimmc.scoroutine.CoQueue
object ProdConTest {
def main(args:Array[String]) = {
val prodcon = new ProduceAndConsume()
prodcon.run
}
}
class ProdCon() {
val sched = new DefaultCoScheduler
val buf = new CoQueue[Int](sched,2)
def run() {
sched.addRoutine("producer"){
var i = 0
while (i < 4) {
buf.blockingEnqueue(i)
}
}
sched.addRoutine("consumer"){
val total = buf.blockingDequeue +
buf.blockingDequeue + buf.blockingDequeue
println("consume total is "+total)
}
sched.runUntilBlockedOrDone
}
}
After the imports and a simple main method
for testing, we have the ProdCon class with the
actual producer-consumer definition.
We start by setting up two objects:
the scheduler that will control our two coroutines,
and a queue for communication between them.
We then register two coroutines with our scheduler,
one for the producer and one for the consumer, and we
call sched.runUntilBlockedOrDone
to run the coroutines until
there is nothing runnable left on that scheduler.
You can't tell in this example, but the code blocks being passed
to addRoutine are CPS code, the same as the body of
a reset block.
If you decide to refactor this code and push the
blockingEnqueue or blockingDequeue calls down into a
subroutine, that subroutine will have to be marked with the
suspendable annotation.
Also, because coroutine bodies are CPS code, there are
there are some restrictions
on what control constructs can be used.
You can see what this looks like in the
ProdConTestWithSubs source code in the
scoroutine examples.
I will give some more examples in my next post,
a followup about Generators.
Unless otherwise specified in individual blog entries, all source code in this blog is Copyright by Jim McBeath, as of the posting date, under the GNU Lesser General Public License (LGPL), Version 3.