In my previous post I said that delimited continuations could be used to create interesting control constructs. In this post I give examples of one such construct: coroutines.
I describe the implementation of a library to make it easier to write coroutines, and I give an example that is built on that library. If you want to go straight to the example, see the section Producer-Consumer.
Like my previous post on delimited continuations, this is a long post. However, though long, it should be much easier going than that post. You don't need to read and understand that post in order to understand this post, but it will help in two places:
- In the discussion of the
CoScheduler API and
implementation,
an understanding of
reset
andshift
is required. - You need to understand at some level about CPS code and its
restrictions
when writing coroutines or using the CPS-only methods in
CoQueue
andBlocker
.
Contents
- Coroutines
- Why Use Coroutines
- Building a Coroutine Library
- Blocker
- CoQueue
- An Attempt to Avoid Blocking
- CoScheduler API
- CoScheduler Implementation
- DefaultCoScheduler
- Other Schedulers
- Producer-Consumer
- Resources
Coroutines
The Wikipdeia coroutines page says coroutines are a generalization of subroutines: whereas a subroutine starts at the beginning every time it is called, a coroutine starts at the beginning the first time it is called, but can start at any other point on successive calls. Generally this means that on successive calls, it continues running from the point where it most recently returned.Instead of using a
return
statement,
coroutines typically use a yield
statement.
A yield
statement
indicates that the routine is done executing for now,
and will resume execution following the yield
statement
the next time it is called.
In the classic definition of coroutines, the
yield
statement indicates which coroutine is to be run next.
With two coroutines, each always yields to the other;
with more than two, a coroutine might have code to determine which
other coroutine to yield to.
Unfortunately for coroutines,
yield
is already a keyword in Scala, so we can't use it for our purposes.
We can either pick a slightly different word such as
yld
or yieldTo
,
or we can just use a different term altogether.
The classic example of the use of coroutines is a pair of routines, one of which (the producer) generates data and one of which (the consumer) consumes data. These two routines are connected by a queue; the producer puts data into the queue and the consumer takes data out of the queue.
This same producer-consumer pair is also a typical example of multi-thread code. Ted Neward uses this producer-consumer example in his blog post describing the concurrency benefits of using Scala.
In the multi-thread producer-consumer example, the producer thread runs and places data into the queue until the queue is full, at which point that thread stops running until the queue empties out enough for it to add more data. Meanwhile the consumer thread runs and takes data out of the queue until the queue is empty, at which point that thread stop running until the queue contains more data. If the host on which the multi-thread application is running contains more than one processor, both of these threads might be running at the same time.
I like to think of coroutines as being like multi-thread code, only without multiple threads. The coroutine version of the producer-consumer example works essentially the same as the multi-thread version, except that only one of the two is ever running at one point in time. The producer runs and places data into the queue until it is full, at which point it pauses and the consumer starts running. The consumer takes data out of the queue until it is empty, at which point it pauses and the producer starts running again.
In both the multi-thread and the coroutine version of this example, there is some state that is saved while each routine is paused waiting for the queue to fill or empty. In the multi-thread example, that state is saved in the thread. In our coroutine example, we use a different mechanism to save that state: a delimited continuation.
Why Use Coroutines
If coroutines are like multi-thread code, why use them and have to deal with continuations rather than just using threads? Here are some possible reasons:- With the default scheduler that runs everything in one thread, you don't need to worry about concurrency issues such as multiple concurrent access to shared state.
- You can create your own scheduler to control when to run each of your coroutines. If you want that scheduler to use a thread pool and run coroutines concurrently, you can do that (assuming you then deal with concurrency issues in your coroutines).
- An application can handle more suspended continuations than it can suspended threads (for example, see slide 19 of Phillip Haller's presentation on Actors, where he says an app can handle up to 5,000 threads, but up to 1,200,000 actors).
Building a Coroutine Library
It is possible to write coroutines directly in Scala code usingreset
and shift
, but
dealing with delimited continuations can be tricky,
so I wanted to isolate all of that code into a reusable library
that would make it easier to write coroutines
as well as allow encapsulating more sophisticated
capabilities within the library.
The package name I selected for this library is
net.jimmc.scoroutine
.
The source code is
available on github.
I started with a change that makes these coroutines look less like coroutines and more like the multi-thread model: rather than have a coroutine specify what other coroutine is to be run, I wanted to be able to specify only that the coroutine is ready to give up control. Essentially, rather than yielding to another coroutine, I always yield to a scheduler, and the scheduler selects and then yields to the next coroutine. Given such a scheduler (described below), we can create a few simple constructs on which to build our coroutine API.
Blocker
In the typical producer-consumer example, there is an explicit check to see if the routine is blocked, and if so, then a call toyield
is made.
I wanted something more generic, so I created an abstract trait
Blocker
to represent the condition that a routine could be blocked by something:
package net.jimmc.scoroutine trait Blocker { def isBlocked: Boolean }The implementations of this for the producer and consumer are straightforward: for the producer,
isBlocked
returns true when the
queue is full;
for the consumer, isBlocked
returns true when
the queue is empty.
Given the
isBlocked
method, the typical coroutine always
includes a code fragment that looks something like this:
while (isBlocked) yield controlSince we will always be yielding control to the scheduler, we can encapsulate this into a more convenient method, which I have called
waitUntilNotBlocked
.
I added this function to my Blocker
trait, and delegated it to
a scheduler of type
CoScheduler
:
package net.jimmc.scoroutine trait Blocker { val scheduler: CoScheduler //class must override to provide instance def isBlocked: Boolean def waitUntilNotBlocked:Unit = { scheduler.waitUntilNotBlocked(this) } }We pass
this
to the scheduler so that it can call our
isBlocked
method and continue our execution only
when isBlocked
returns false.
There is one more detail to be added to
Blocker
,
but it is an important one.
I stated above that this implementation is built on
top of delimited continuations.
When we call waitUntilNotBlocked
and we are blocked, we want the coroutine library to wait until we
are no longer blocked and then continue execution of our routine.
The coroutine library will be using delimited continuations to do this,
and since our routine might be suspended, the signature
for the waitUntilNotBlocked
method must
include the
suspendable
annotation.
We add that annotation along with a suitable import
statement
to get our final version of Blocker
:
package net.jimmc.scoroutine import scala.util.continuations._ trait Blocker { val scheduler: CoScheduler //class must override to provide instance def isBlocked: Boolean def waitUntilNotBlocked:Unit @suspendable = { scheduler.waitUntilNotBlocked(this) } }
CoQueue
Once we haveBlocker
, it is easy to compose a blocking
version of the standard
Queue
class
(where by "blocking" I mean that the coroutine will be suspended until
the specified Blocker
is no longer blocked).
We want a version of the
Queue.enqueue
function
that blocks when the queue is full,
and a version of the
Queue.dequeue
function
that blocks when the queue is empty.
We create a thin wrapper around the Queue
class,
which we call CoQueue
,
to implement our blocking methods for use with our coroutines.
For each of our two blocking conditions,
we create an instance of Blocker
with those conditions as each of their isBlocked
functions,
and we use those Blocker
instances to create our
blockingEnqueue
and blockingDequeue
methods.
Because the
blockingEnqueue
and blockingDequeue
methods might block,
they must be annotated as suspendable
,
which means they can only be called from within CPS code.
Here is our entire
CoQueue
class:
package net.jimmc.scoroutine import scala.util.continuations._ import scala.collection.mutable.Queue class CoQueue[A](val scheduler:CoScheduler, val waitSize:Int) extends Queue[A] { coqueue => val enqueueBlocker = new Blocker() { val scheduler = coqueue.scheduler def isBlocked() = length >= waitSize } val dequeueBlocker = new Blocker() { val scheduler = coqueue.scheduler def isBlocked() = isEmpty } def blockingEnqueue(x:A):Unit @suspendable = { enqueueBlocker.waitUntilNotBlocked enqueue(x) } def blockingDequeue():A @suspendable = { dequeueBlocker.waitUntilNotBlocked dequeue } }
An Attempt to Avoid Blocking
You might wonder if we could implementblockingEnqueue
as follows so as to avoid the call to the scheduler's
waitUntilNotBlocked
method:
//this won't compile def blockingEnqueue(x:A):Unit @suspendable = { if (enqueueBlocker.isBlocked) enqueueBlocker.waitUntilNotBlocked enqueue(x) }However, the compiler gives this error message:
CoQueue.scala:22: error: type mismatch; found : Unit required: Unit @scala.util.continuations.cpsParam[Unit,Unit] if (enqueueBlocker.isBlocked) ^Syntactically, the problem is that the above one-sided
if
statement is equivalent to
if (enqueueBlocker.isBlocked) enqueueBlocker.waitUntilNotBlocked else ()The type of
()
is Unit
, but the type of
waitUntilNotBlocked
is Unit @suspendable
,
so there is a type mismatch.
You might think we could use some trickery such as this:
//compiles, but won't run properly def unitSuspendable:Unit @suspendable = () def blockingEnqueue(x:A):Unit @suspendable = { if (enqueueBlocker.isBlocked) enqueueBlocker.waitUntilNotBlocked else unitSuspendable enqueue(x) }This will compile without errors, but will not work properly. The problem is that we are trying to define one code path that is CPS (through
waitUntilNotBlocked
) and one path that
is not (through unitSuspendable
).
The CPS compiler plugin transforms the code to use continuations,
which, as you might recall from the discussion in my previous post,
packages the rest of the function up in a continuation and passes it
along to the called function.
But the unitSuspendable
expression does nothing with that
continuation; it neither executes it nor saves it for later execution.
Thus as soon as this code path is taken, the continuation -
which represents
the remainder of the execution of the entire delimited continuation,
including the caller of this function -
is dropped, and the whole delimited continuation is done.
CoScheduler API
The scheduler is the piece of the coroutine library that saves the continuations for all of the participating coroutines and determines when to execute those continuations. By design, all of the tricky stuff is encapsulated in this class. I have divided the discussion of the scheduler into two section: this first section discusses at a high level the tasks for which the schedule must take responsibility, and defines an API to perform those tasks. The following section describes the implementation of that API. If you don't want to get too far into the delimited continuation stuff, you can read just this section and skip the implementation section.I started with the requirements that it should be possible to write a scheduler that:
- uses one thread for all participating coroutines, or uses a thread pool so that multiple coroutines can run at once;
- uses a simple algorithm to select which coroutine to run next, such as round-robin, or a more complex algorithm, such as a priority-based approach;
- can be instantiated multiple times so that different collections of coroutines can be managed with different schedulers.
CoScheduler
.
CoScheduler
needs to define three basic functions:
- Register a coroutine.
I chose to do this with a method that accepts a block of code which
is the coroutine body.
Since the coroutine body might be suspended, the signature for that
argument must include the
suspendable
annotation. I call this methodaddRoutine
. To improve tracing, I also pass in a name argument that can be used to identify that coroutine. - Wait until a coroutine is no longer blocked.
This is the method to which
Blocker.waitUntilNotBlocked
will delegate. It is also calledwaitUntilNotBlocked
, and takes an argument which is theBlocker
whoseisBlocked
method will be used to make that determination. Since this method will be called from the coroutine body or a method called from that body, it must be marked assuspendable
. - Run a coroutine.
There are two flavors of this: run a single step, returning as soon as
one coroutine has run and has returned control to the scheduler,
or run as many steps as possible,
until there are no more unblocked coroutines to run.
I call these two methods
runNextUnblockedRoutine
andrunUntilBlockedOrDone
, respectively. Since these methods are meant to be called from the main application, not from within a coroutine, they are not marked assuspendable
.
When one of therun
methods returns, we would like to know whether there are some blocked coroutines, all of our coroutines have completed, or we don't know which because we only ran one routine. In order to return this indication, we define a sealed classRunStatus
with three case objects representing these three possibilities.
CoScheduler
trait thus looks like this:
package net.jimmc.scoroutine sealed abstract class RunStatus case object RanOneRoutine extends RunStatus case object SomeRoutinesBlocked extends RunStatus case object AllRoutinesDone extends RunStatus trait CoScheduler { def addRoutine(name:String)(body: => Unit @suspendable) def waitUntilNotBlocked(b:Blocker):Unit @suspendable def runNextUnblockedRoutine():RunStatus def runUntilBlockedOrDone():RunStatus }If you are interested in using the
scoroutine
library
to write coroutines, but you don't care how it works internally,
then you can skip the next few sections and pick up again at
Producer-Consumer.
CoScheduler Implementation
The reason for implementingCoScheduler
as a trait rather
than a class is to make it easier to create multiple implementations.
On the other hand, there is some functionality which is likely to be
the same in all implementations, and since Scala allows us to include
code in traits, we will add the implementation of those methods to the trait.
For example, given the
runNextUnblockedRoutine
method,
the runUntilBlockedOrDone
method is a simple loop that
calls runNextUnblockedRoutine
until it does not run something.
Likewise, if we internally create an instance of a
Blocker
in addRoutine
,
we can implement both that method and waitUntilNotBlocked
on top of a single method that stores a continuation,
which we will call setRoutineContinuation
.
This means we can create a concrete scheduler class that extends the
CoScheduler
trait by implementing just two functions:
runNextUnblockedRoutine
and
setRoutineContinuation
.
Below is what our implementation of
CoScheduler
looks like
after making the above changes.
package net.jimmc.scoroutine import scala.util.continuations._ sealed class RunStatus case object RanOneRoutine extends RunStatus case object SomeRoutinesBlocked extends RunStatus case object AllRoutinesDone extends RunStatus trait CoScheduler { cosched => private[scoroutine] def setRoutineContinuation( b:Blocker, cont:Option[Unit=>Unit]):Unit def runNextUnblockedRoutine():RunStatus /* We use a class rather than an object because we are using the * instance as a key to find more info about the associated routine. */ class BlockerNever() extends Blocker { val scheduler = cosched val isBlocked = false } def addRoutine(name:String)(body: => Unit @suspendable) { reset { val blocker = new BlockerNever() waitUntilNotBlocked(blocker) body } } def runUntilBlockedOrDone():RunStatus = { var status:RunStatus = RanOneRoutine while (status==RanOneRoutine) { status = runNextUnblockedRoutine() } status } def waitUntilNotBlocked(b:Blocker):Unit @suspendable = { shift( (cont: Unit=>Unit) => { setRoutineContinuation(b,Some(cont)) }) } }As we already knew based on the existence of the
suspendable
annotation, there are two methods that deal
with CPS code: addRoutine
and waitUntilNotBlocked
.
Before examining these functions, it is worth reviewing one point about how
reset
and shift
work.
When there is a shift
call within a reset
block,
the CPS compiler plugin transforms the code within the reset
block such that the code after the shift
block is passed
as a continuation argument to the shift
block.
The code within the shift
block is thus the last code
to be executed within the reset
block.
The code that is within the reset
but outside of the
shift
is CPS code, but the code within the shift
block is not CPS code.
In other words, once within a reset
block, we are executing
CPS code until we get into the shift
block, at which point
we are no longer executing CPS code.
The continuation contains CPS code; if we call the continuation from
within our shift
code, we transition from non-CPS code
into CPS code.
But if we happen to save the continuation,
we can directly execute it later from any non-CPS code, and that will likewise
transition us into the CPS code of the continuation.
The
waitUntilNotBlocked
function takes as an argument the
Blocker
that will tell us when the calling coroutine is
allowed to run again, and, along with the continuation passed in to
the shift
, passes it to
setRoutineContinuation
.
That function saves the continuation and its associated Blocker
and returns without executing the continuation.
Because we are returning without executing the continuation,
control returns to the first statement past the end of the
enclosing reset
block,
or, if the current code/continuation (i.e. the CPS code containing the call to
waitUntilNotBlocked
) was directly executed from non-CPS code,
to the statement after
the point at which that continuation was executed.
The
addRoutine
function creates a Blocker
that
never blocks, then calls waitUntilNotBlocked
with that
blocker. Since waitUntilNotBlocked
is a CPS function
(marked with the suspendable
annotation),
the remainder of the code in the reset
block is turned
into a continuation and passed along to waitUntilNotBlocked
.
When that method calls shift
, the continuation we passed to
waitUntilNotBlocked
- i.e. our call to body
-
is part of the continuation passed to the shift
.
Thus when that method saves the continuation, that saved continuation
includes the call to the coroutine body.
Since the continuation is not immediately executed, control returns
to the end of the reset
block, and we return from
addRoutine
with our coroutine sitting in the scheduler
ready to start running.
DefaultCoScheduler
Given theCoScheduler
trait described above, the only
functionality that remains for our concrete class is
to implement a mechanism for storing continuations
and selecting the next one to run.
The
DefaultCoScheduler
implements a simple round-robin
scheduling mechanism, selecting the next runnable continuation each
time it is invoked.
Note that this implementation has been designed to be simple, but
is not very efficient.
In particular, it will not exhibit good performance when there are a large
number of coroutines of which only a few are runnable at any time.
We define a case class
BlockerInfo
to tie together a
Blocker
and its associated continuation,
an ArrayBuffer
to store an ordered set of those,
and a map to find one given a Blocker
.
The
setRoutineContinuation
function adds a new
BlockerInfo
to our array if we don't already have one
for the given Blocker
, or updates the existing one if we do.
The
runNextUnblockedRoutine
function does a simple
linear scan through the array of items, starting just past where we left off
the last time, looking for the first unblocked continuation and
running it.
If there were no runnable continuations, we return a status code
without running anything.
package net.jimmc.scoroutine import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap class DefaultCoScheduler extends CoScheduler { val blockerIndexMap = new HashMap[Blocker,Int] case class BlockerInfo(val blocker:Blocker, index:Int, var cont:Option[Unit=>Unit]) val blockerList = new ArrayBuffer[BlockerInfo] var nextIndex = 0 private[scoroutine] def setRoutineContinuation( b:Blocker,cont:Option[Unit=>Unit]) { if (blockerIndexMap.get(b).isEmpty) { val nextIndex = blockerIndexMap.size blockerIndexMap.put(b,nextIndex) blockerList += BlockerInfo(b, nextIndex, cont) } else { val n = blockerIndexMap(b) blockerList(n).cont = cont } } def runNextUnblockedRoutine():RunStatus = { var blockedCount = 0 for (i <- 0 until blockerList.size) { val index = (nextIndex + i) % blockerList.size val bInfo = blockerList(index) if (bInfo.cont.isDefined && bInfo.blocker.isBlocked) { blockedCount += 1 } if (bInfo.cont.isDefined && !bInfo.blocker.isBlocked) { nextIndex = index + 1 val nextCont = bInfo.cont bInfo.cont = None nextCont.get() //run the continuation return RanOneRoutine } } if (blockedCount > 0) { SomeRoutinesBlocked } else { AllRoutinesDone } } }Note that
DefaultCoScheduler
does not import
the continuations package.
It does not use reset
, shift
,
or any of the CPS annotations such as suspendable
.
This is because none of this code is CPS code.
runNextUnblockedRoutine
is called from non-CPS code,
and although setRoutineContinuation
is called from
CPS code, it does not itself call any CPS functions nor does it use
shift
, so it does not need to be declared as CPS code.
Other Schedulers
DefaultCoScheduler
implements a basic scheduling algorithm.
It is intended for use with small numbers of coroutines and has not
been optimized.
Other schedulers could be written that are optimized for other
situations, such as large numbers of coroutines,
coroutines with different priorities,
or "stickiness" so that a running coroutine continues to run
until it is blocked before the next coroutine runs.
Since the code that creates the coroutines starts by creating
the scheduler that controls those coroutines,
it would be simple to create a scheduler other than
DefaultCoScheduler
for use with a particular
set of coroutines.
Producer-Consumer
Let's see how the Producer-Consumer example (ProdConTest
)
looks using the
scoroutine
library:
import scala.util.continuations._ import net.jimmc.scoroutine.DefaultCoScheduler import net.jimmc.scoroutine.CoQueue object ProdConTest { def main(args:Array[String]) = { val prodcon = new ProduceAndConsume() prodcon.run } } class ProdCon() { val sched = new DefaultCoScheduler val buf = new CoQueue[Int](sched,2) def run() { sched.addRoutine("producer"){ var i = 0 while (i < 4) { buf.blockingEnqueue(i) } } sched.addRoutine("consumer"){ val total = buf.blockingDequeue + buf.blockingDequeue + buf.blockingDequeue println("consume total is "+total) } sched.runUntilBlockedOrDone } }After the imports and a simple
main
method
for testing, we have the ProdCon
class with the
actual producer-consumer definition.
We start by setting up two objects: the scheduler that will control our two coroutines, and a queue for communication between them. We then register two coroutines with our scheduler, one for the producer and one for the consumer, and we call
sched.runUntilBlockedOrDone
to run the coroutines until
there is nothing runnable left on that scheduler.
You can't tell in this example, but the code blocks being passed to
addRoutine
are CPS code, the same as the body of
a reset
block.
If you decide to refactor this code and push the
blockingEnqueue
or blockingDequeue
calls down into a
subroutine, that subroutine will have to be marked with the
suspendable
annotation.
Also, because coroutine bodies are CPS code, there are
there are some restrictions
on what control constructs can be used.
You can see what this looks like in the
ProdConTestWithSubs
source code in the
scoroutine examples.
I will give some more examples in my next post, a followup about Generators.
Resources
- My original blog post on Delimited Continuations, which includes its own Resources section.
- The net.jimmc.scoroutine library on github.
- python-multitask, a similar framework for Python.
10 comments:
Seems like Blockable would be a better name for the Blocker trait. Blocker implies that an object (the coroutine) can cause a blockage, not that it can be blocked.
Great library, and very well explained!
Michael S: The Blocker trait is in fact realized in the object that causes the blockage - in this case, CoQueue. Perhaps the method within Blocker should be called isBlocking.
Great post! I haven't finished reading the whole thing, but in terms of delimited continuations, wouldn't this trickery work for CoQueue + Delimted continuations:
// this will execute the continuation immediately
def continueNow: Unit @suspendable = shift { k : (Unit => Unit) => k() }
// This will now handle the continuation appropriately for both cases, I think ;)
def blockingEnqueue(x:A):Unit @suspendable = {
if (enqueueBlocker.isBlocked)
enqueueBlocker.waitUntilNotBlocked
else
continueNow
enqueue(x)
}
Essentially, you're running the delimited continuation *immediately* if there is noting blocking it. Anyway, as I said, still going through, but great article!
Josh: Yes, your approach will work, with one caveat: each time you execute that continuation within the shift block you are (tail) recursing and adding to the stack size. So this is not a good general solution because now you have to be concerned about how many times you call continueNow in a loop. By calling waitUntilNotBlocked and allowing the coroutine to be continued by the regular scheduler's task runner loop (runUntilBlockedOrDone), we are using a trampoline that prevents this problem.
That's one thing that's been bothering me about delimited continuations. It seems there should be a way I can convert loop-based tail-recursion to either use a trampoline or have some other kind of TCO. In any case, I finished the blog. Great post.
Great post! As long as enqueueBlocker.waitUntilNotBlocked does proper trampolining you should be safe if you define
def continueNow: Unit @suspendable = ()
The trick here is not to use shift explicitly but have the compiler insert a shiftUnit call. These will be detected by the transformation and tail-call-optimized if possible. While-Loops will not pile up stack space for shiftUnit.
It should also work without an explicit continueNow:
def blockingEnqueue(x:A):Unit @suspendable = {
if (enqueueBlocker.isBlocked)
enqueueBlocker.waitUntilNotBlocked
enqueue(x)
}
Sorry, should have read the post more thoroughly before commenting. You do need the else branch in this case. The conditional is in the middle of a block expression {..} and although the context says the whole block has a @suspendable type, nothing is known a priori about the statements in the middle of the block.
It still strikes me as odd that you're saying with
def continueNow: Unit @suspendable = ()
it will not run properly. I believe it should, because the conversion of () to Unit @suspendable happens via shiftUnit(()), which is basically the same as shift((k:Unit=>Unit)=>k()). That is, it just calls the continuation with the given constant. So if this doesn't work it might need some fixing.
Tiark: It seems I still have some holes in my understanding of your delimited continuations package. Thanks for the clarification, both on how shiftUnit works and on when it is possible to use a one-sided if statement containing a call to a @suspendable.
Josh: My apologies for not first trying out your code suggestion before replying to your comment. My comment about stack overflows was based on what I now see was an incorrect assumption, colored by the fact that when I was first working on this library I was getting stack overflows - but that was due to improper trampolining, which has since been fixed.
I tried changing the body of Blocker.waitUntilNotBlocked to include "if (isBlocked)" before the call to "scheduler.waitUntilNotBlocked", without an "else" clause, and that compiled and ran just fine, without a stack overflow. I also tried adding "else continueNow", and that too compiled and ran without a stack overflow. As expected, putting in the explicit shift((k:Unit=>Unit)=>k())
instead of continueNow did result in a stack overflow.
Dear Jim,
These are wonderful and timely posts. The co-routining + producer/consumer is close to the TupleSpace abstraction. i recently wrote up a quick version, but spotted a small concern. Is it possible to transform this pattern in such a way that the get does not require a continuation to be passed in?
Best wishes,
--greg
Greg: Your comment is pretty cryptic. At first I thought you might be asking about using the main app as Consumer, in which case I would suggest you read my follow-up post about generators. But I am guessing that you are referring to your thread about TupleSpace in the scala mailing list, so I have posted a response to you there.
Also, a clarification on my previous comment: the version of continueNow I tried was Tiark's version, equivalent to the unitSuspendable I used in the post.
Post a Comment