Thursday, July 28, 2011

Debugging Scala Parser Combinators

Two simple mechanisms for debugging parsers written using Scala's parser combinators.

Contents

Introduction

In a recent comment on my 2008 blog post about Scala's parser combinators, a reader asked how one might go about debugging such a parser. As one post says, "Debugging a parser implemented with the help of a combinator library has its special challenges." You may have trouble setting breakpoints, and stack traces can be difficult to interpret.

The two techniques I show here may not provide you with the kind of visibility you might be used to when single-stepping through problem code, but I hope they provide at least a little more visibility than you might otherwise have.

Example Parser

As an example parser I will use an integer-only version of the four-function arithmetic parser I built for my 2008 parser combinator post. The code consists of a set of case classes to represent the parsed results and a parser class that contains the parsing rules and a few helper methods. You can copy this code into a file and either compile it or load it into the Scala REPL.

import scala.util.parsing.combinator.syntactical.StandardTokenParsers

sealed abstract class Expr {
    def eval():Int
}

case class EConst(value:Int) extends Expr {
    def eval():Int = value
}

case class EAdd(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval + right.eval
}

case class ESub(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval - right.eval
}

case class EMul(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval * right.eval
}

case class EDiv(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval / right.eval
}

case class EUMinus(e:Expr) extends Expr {
    def eval():Int = -e.eval
}

object ExprParser extends StandardTokenParsers {
    lexical.delimiters ++= List("+","-","*","/","(",")")

    def value = numericLit ^^ { s => EConst(s.toInt) }

    def parens:Parser[Expr] = "(" ~> expr <~ ")"

    def unaryMinus:Parser[EUMinus] = "-" ~> term ^^ { EUMinus(_) }

    def term = ( value |  parens | unaryMinus )

    def binaryOp(level:Int):Parser[((Expr,Expr)=>Expr)] = {
        level match {
            case 1 =>
                "+" ^^^ { (a:Expr, b:Expr) => EAdd(a,b) } |
                "-" ^^^ { (a:Expr, b:Expr) => ESub(a,b) }
            case 2 =>
                "*" ^^^ { (a:Expr, b:Expr) => EMul(a,b) } |
                "/" ^^^ { (a:Expr, b:Expr) => EDiv(a,b) }
            case _ => throw new RuntimeException("bad precedence level "+level)
        }
    }
    val minPrec = 1
    val maxPrec = 2

    def binary(level:Int):Parser[Expr] =
        if (level>maxPrec) term
        else binary(level+1) * binaryOp(level)

    def expr = ( binary(minPrec) | term )

    def parse(s:String) = {
        val tokens = new lexical.Scanner(s)
        phrase(expr)(tokens)
    }

    def apply(s:String):Expr = {
        parse(s) match {
            case Success(tree, _) => tree
            case e: NoSuccess =>
                   throw new IllegalArgumentException("Bad syntax: "+s)
        }
    }

    def test(exprstr: String) = {
        parse(exprstr) match {
            case Success(tree, _) =>
                println("Tree: "+tree)
                val v = tree.eval()
                println("Eval: "+v)
            case e: NoSuccess => Console.err.println(e)
        }
    }
    
    //A main method for testing
    def main(args: Array[String]) = test(args(0))
}
In the ExprParser class, the lines up to and including the definition of the expr method define the parsing rules, whereas the methods from parse onwards are helper methods.

Calling Individual Parsers

In our example parser we can easily ask it to parse a string by calling our ExprParser.test method, which parses the string using our parse method, prints the resulting parse, and (if the parse was successful) evaluates the parse tree and prints that value.

The last line of parse parses a string using our expression parser:
phrase(expr)(tokens)
phrase is a method in StandardTokenParsers that parses an input stream using the specified parser. The only thing special about our expr method is that we happen to have selected it as our top-level parser - but we could just as easily have picked one of our other parsers as our top-level parser.

Let's add another version of the test method that lets us specify which parser to use as the top-level parser. We want to print out the results in the same way as for the existing test method, so we first refactor that existing method:
def test(exprstr: String) =
        printParseResult(parse(exprstr))

    def printParseResult(pr:ParseResult[Expr]) = {
        pr match {
            case Success(tree, _) =>
                println("Tree: "+tree)
                val v = tree.eval()
                println("Eval: "+v)
            case e: NoSuccess => Console.err.println(e)
        }
    }
Now we add a new parse method that accepts a parser as an argument, and we call that from our new test method:
def parse(p:Parser[Expr], s:String) = {
        val tokens = new lexical.Scanner(s)
        phrase(p)(tokens)
    }

    def test(p:Parser[Expr], exprstr: String) =
        printParseResult(parse(p,exprstr))
We can run the Scala REPL, load our modified file using the ":load" command, then manually call the top-level parser by calling our test method. To reduce typing, we import everything from ExprParser. In the examples below, text in bold is what we type, the rest is printed by the REPL.
scala> import ExprParser._
import ExprParser._

scala> test("1+2")
Tree: EAdd(EConst(1),EConst(2))
Eval: 3

scala> test("1+2*3")
Tree: EAdd(EConst(1),EMul(EConst(2),EConst(3)))
Eval: 7

scala> test("(1+2)*3")
Tree: EMul(EAdd(EConst(1),EConst(2)),EConst(3))
Eval: 9
We can also call the test method that takes a parser as an argument, allowing us to specifically test one particular parsing rule at a time. If we pass in expr as the parser, we will get the same results as above; but if we pass in a different parser, we may get different results.
scala> test(expr,"1+2*3")
Tree: EAdd(EConst(1),EMul(EConst(2),EConst(3)))
Eval: 7

scala> test(binary(1),"1+2*3")
Tree: EAdd(EConst(1),EMul(EConst(2),EConst(3)))
Eval: 7

scala> test(binary(2),"1+2*3")
[1.2] failure: ``/'' expected but `+' found

1+2*3
 ^

scala> test(parens,"1+2")
[1.1] failure: ``('' expected but 1 found

1+2
^

scala> test(parens,"(1+2)")
Tree: EAdd(EConst(1),EConst(2))
Eval: 3

scala> test(parens,"(1+2)*3")
[1.6] failure: end of input expected

(1+2)*3
     ^

Tracing

If you have a larger parser that is not behaving and you are not quite sure where the problem lies, it can be tedious to directly call individual parsers until you find which one is misbehaving. Being able to trace the progress of the whole parser running on an input known to cause the problem might be helpful, but sprinkling println statements throughout your parser can be tricky. This section provides an approach that allows you to do some tracing with minimal changes to your code. The output can get pretty verbose, but at least this will give you a starting point from which you may be able to devise your own improved debugging.

The idea behind this approach is to wrap some or all of the individual parsers in a debugging parser that delegates its apply action to the wrapper parser, but that prints out some debugging information. The apply action is called during the act of parsing.

Note: this code relies on the fact that the code for the various combinators in the Parser class in Scala's StandardTokenParsers (which is implemented as an inner class in scala.util.parsing.combinator.Parsers) does not override any Parser method other than apply.

This code could be added directly to the ExprParser class, but it is presented here as a separate class to make it easier to reuse. Add this DebugStandardTokenParsers class to the file containing ExprParsers.
trait DebugStandardTokenParsers extends StandardTokenParsers {
    class Wrap[+T](name:String,parser:Parser[T]) extends Parser[T] {
        def apply(in: Input): ParseResult[T] = {
            val first = in.first
            val pos = in.pos
            val offset = in.offset
            val t = parser.apply(in)
            println(name+".apply for token "+first+
                    " at position "+pos+" offset "+offset+" returns "+t)
            t
        }
    }
}
The Wrap class provides the hook into the apply method that we need in order to print out our trace information as the parser runs. Once this class is in place, we modify ExprParser to inherit from it rather than from StandardTokenParsers:
object ExprParser extends DebugStandardTokenParsers { ... }
So far we have not changed the behavior of the parser, since we have not yet wired in the Wrap class. To do so, we can take any of the existing parsers and wrap it in a new Wrap. For example, with the top-level expr parser we could do this, with the added code highlighted in bold:
def expr = new Wrap("expr", ( binary(minPrec) | term ) )
We can make this a bit easier to edit and read by using implicits. In DebugStandardTokenParsers we add this method:
implicit def toWrapped(name:String) = new {
        def !!![T](p:Parser[T]) = new Wrap(name,p)
    }
Now we can wrap our expr method like this:
def expr = "expr" !!! ( binary(minPrec) | term )
If you don't like using !!! as an operator, you are free to pick something more to your taste, or you can leave out the implicit and just use the new Wrap approach.

At this point you must modify your source code by adding the above syntax to each parsing rule that you want to trace. You can go through and do them all, or you can just pick out the ones you think are the most likely culprits and wrap those. Note that you can wrap any parser this way, including those that appear as pieces in the middle of other parsers. The following example shows how some of the parsers in the term and binaryOp methods can be wrapped:
    def term = "term" !!! ( value |  "term-parens" !!! parens | unaryMinus )

    def binaryOp(level:Int):Parser[((Expr,Expr)=>Expr)] = {
        level match {
            case 1 =>
                "add" !!! "+" ^^^ { (a:Expr, b:Expr) => EAdd(a,b) } |
                "sub" !!! "-" ^^^ { (a:Expr, b:Expr) => ESub(a,b) }
            case 2 =>
                "mul" !!! "*" ^^^ { (a:Expr, b:Expr) => EMul(a,b) } |
                "div" !!! "/" ^^^ { (a:Expr, b:Expr) => EDiv(a,b) }
            case _ => throw new RuntimeException("bad precedence level "+level)
        }
    }

Assuming we have wrapped the expr, term and binaryOp methods as in the above examples, here is what the output looks like for a few tests. As in the previous REPL example, user input is in bold. If you are using the REPL and reload the file, remember to run import ExprParser._ again to pick up the newer definitions.
scala> test("1")
term.apply for token 1 at position 1.1 offset 0 returns [1.2] parsed: EConst(1)
plus.apply for token EOF at position 1.2 offset 1 returns [1.2] failure: ``+'' expected but EOF found

1
 ^
minus.apply for token EOF at position 1.2 offset 1 returns [1.2] failure: ``-'' expected but EOF found

1
 ^
expr.apply for token 1 at position 1.1 offset 0 returns [1.2] parsed: EConst(1)
Tree: EConst(1)
Eval: 1

scala> test("(1+2)*3")
term.apply for token 1 at position 1.2 offset 1 returns [1.3] parsed: EConst(1)
plus.apply for token `+' at position 1.3 offset 2 returns [1.4] parsed: +
term.apply for token 2 at position 1.4 offset 3 returns [1.5] parsed: EConst(2)
plus.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``+'' expected but `)' found

(1+2)*3
    ^
minus.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``-'' expected but `)' found

(1+2)*3
    ^
expr.apply for token 1 at position 1.2 offset 1 returns [1.5] parsed: EAdd(EConst(1),EConst(2))
term-parens.apply for token `(' at position 1.1 offset 0 returns [1.6] parsed: EAdd(EConst(1),EConst(2))
term.apply for token `(' at position 1.1 offset 0 returns [1.6] parsed: EAdd(EConst(1),EConst(2))
term.apply for token 3 at position 1.7 offset 6 returns [1.8] parsed: EConst(3)
plus.apply for token EOF at position 1.8 offset 7 returns [1.8] failure: ``+'' expected but EOF found

(1+2)*3
       ^
minus.apply for token EOF at position 1.8 offset 7 returns [1.8] failure: ``-'' expected but EOF found

(1+2)*3
       ^
expr.apply for token `(' at position 1.1 offset 0 returns [1.8] parsed: EMul(EAdd(EConst(1),EConst(2)),EConst(3))
Tree: EMul(EAdd(EConst(1),EConst(2)),EConst(3))
Eval: 9

scala> test(parens,"(1+2)")
term.apply for token 1 at position 1.2 offset 1 returns [1.3] parsed: EConst(1)
mul.apply for token `+' at position 1.3 offset 2 returns [1.3] failure: ``*'' expected but `+' found

(1+2)
  ^
div.apply for token `+' at position 1.3 offset 2 returns [1.3] failure: ``/'' expected but `+' found

(1+2)
  ^
add.apply for token `+' at position 1.3 offset 2 returns [1.4] parsed: +
term.apply for token 2 at position 1.4 offset 3 returns [1.5] parsed: EConst(2)
mul.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``*'' expected but `)' found

(1+2)
    ^
div.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``/'' expected but `)' found

(1+2)
    ^
add.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``+'' expected but `)' found

(1+2)
    ^
sub.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``-'' expected but `)' found

(1+2)
    ^
expr.apply for token 1 at position 1.2 offset 1 returns [1.5] parsed: EAdd(EConst(1),EConst(2))
Tree: EAdd(EConst(1),EConst(2))
Eval: 3
As you can see, even for these very short input strings the output is pretty verbose. It does, however, show you what token it is trying to parse and where in the input stream that token is, so by paying attention to the position and offset numbers you can see where it is backtracking.

When you have found the problem and are done debugging, you can remove the DebugStandardTokenParsers class and take out all of the !!! wrapping operations, or you can leave everything in place and disable the wrapper output by changing the definition of the implicit !!! operator to this:
def !!![T](p:Parser[T]) = p
Or, if you want to make it possible to enable debugging output later, change !!! to return either p or new Wrap(p) depending on some debugging configuration value.

Updated Example

Below is the complete program with all of the above changes.
import scala.util.parsing.combinator.syntactical.StandardTokenParsers

sealed abstract class Expr {
    def eval():Int
}

case class EConst(value:Int) extends Expr {
    def eval():Int = value
}

case class EAdd(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval + right.eval
}

case class ESub(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval - right.eval
}

case class EMul(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval * right.eval
}

case class EDiv(left:Expr, right:Expr) extends Expr {
    def eval():Int = left.eval / right.eval
}

case class EUMinus(e:Expr) extends Expr {
    def eval():Int = -e.eval
}

trait DebugStandardTokenParsers extends StandardTokenParsers {
    class Wrap[+T](name:String,parser:Parser[T]) extends Parser[T] {
        def apply(in: Input): ParseResult[T] = {
            val first = in.first
            val pos = in.pos
            val offset = in.offset
            val t = parser.apply(in)
            println(name+".apply for token "+first+
                    " at position "+pos+" offset "+offset+" returns "+t)
            t
        }
    }

    implicit def toWrapped(name:String) = new {
        def !!![T](p:Parser[T]) = new Wrap(name,p) //for debugging
        //def !!![T](p:Parser[T]) = p              //for production
    }
}

object ExprParser extends DebugStandardTokenParsers {
    lexical.delimiters ++= List("+","-","*","/","(",")")

    def value = numericLit ^^ { s => EConst(s.toInt) }

    def parens:Parser[Expr] = "(" ~> expr <~ ")"

    def unaryMinus:Parser[EUMinus] = "-" ~> term ^^ { EUMinus(_) }

    def term = "term" !!! ( value |  "term-parens" !!! parens | unaryMinus )

    def binaryOp(level:Int):Parser[((Expr,Expr)=>Expr)] = {
        level match {
            case 1 =>
                "add" !!! "+" ^^^ { (a:Expr, b:Expr) => EAdd(a,b) } |
                "sub" !!! "-" ^^^ { (a:Expr, b:Expr) => ESub(a,b) }
            case 2 =>
                "mul" !!! "*" ^^^ { (a:Expr, b:Expr) => EMul(a,b) } |
                "div" !!! "/" ^^^ { (a:Expr, b:Expr) => EDiv(a,b) }
            case _ => throw new RuntimeException("bad precedence level "+level)
        }
    }
    val minPrec = 1
    val maxPrec = 2

    def binary(level:Int):Parser[Expr] =
        if (level>maxPrec) term
        else binary(level+1) * binaryOp(level)

    def expr = "expr" !!! ( binary(minPrec) | term )

    def parse(s:String) = {
        val tokens = new lexical.Scanner(s)
        phrase(expr)(tokens)
    }

    def parse(p:Parser[Expr], s:String) = {
        val tokens = new lexical.Scanner(s)
        phrase(p)(tokens)
    }

    def apply(s:String):Expr = {
        parse(s) match {
            case Success(tree, _) => tree
            case e: NoSuccess =>
                   throw new IllegalArgumentException("Bad syntax: "+s)
        }
    }

    def test(exprstr: String) =
        printParseResult(parse(exprstr))

    def test(p:Parser[Expr], exprstr: String) =
        printParseResult(parse(p,exprstr))

    def printParseResult(pr:ParseResult[Expr]) = {
        pr match {
            case Success(tree, _) =>
                println("Tree: "+tree)
                val v = tree.eval()
                println("Eval: "+v)
            case e: NoSuccess => Console.err.println(e)
        }
    }
    
    //A main method for testing
    def main(args: Array[String]) = test(args(0))
}

Tuesday, July 19, 2011

Multithread Coroutine Scheduler

Multithread Coroutine Scheduler

A scheduler that uses multiple worker threads for continuations-based Scala coroutines.

In my recent series of posts that ended with a complete Scala server that uses continuations-based coroutines to store per-client state, I asserted that the single-threaded scheduler implementation in that example could relatively easily be replaced by a scheduler that uses multiple threads. In this post I provide a simple working example of such a multithread scheduler.

Contents

Overview

We can use the standard thread-pool approach in which we have a pool of worker threads that independently pull from a common task queue. Java 1.5 introduced a set of classes and interfaces in the java.util.concurrent package to support various kinds of thread pools or potentially other task scheduling mechanisms. Rather than writing our own, we will use an Executor from that package.

We have an additional requirement that makes our situation a little bit more complex than the typical thread-pool: our collection of tasks includes both tasks that are ready to run and tasks that are currently blocked but will become ready to run at some point in the future.

We will implement a new scheduler class JavaExecutorCoScheduler that maintains a list of blocked tasks and uses a Java Executor to manage runnable tasks.

The updated complete source code for this post is available on github in my nioserver project under the tag blog-executor.

Managing Tasks

As mentioned above, we need to deal with two kinds of tasks: tasks that are ready to run and tasks that are blocked. The standard Executor class allows us to submit a task for execution, but does not handle blocked tasks. Since we don't want to submit blocked tasks to the Executor, we have to queue them up ourselves. We have two issues to attend to:
  1. When our scheduler is passed a task, we must put it into our own queue of blocked tasks if it is not currently ready to run.
  2. When a previously blocked task becomes ready to run, we must remove it from our queue of blocked tasks and pass it to the Executor.
The first issue is straightforward, as our framework already allows us to test the blocker for a task and see if the task is ready to run. In order to properly take care of the second issue, we will make a small change to our framework to allow us to notice when a blocker has probably stopped blocking so that we can run the corresponding task. We do this by modifying our CoScheduler class to add a method to notify it that a blocker has probably become unblocked:
    def unblocked(b:Blocker):Unit
We call this method from CoQueue in the two places where we previously called scheduler.coNotify: in the blockingEnqueue method after we have enqueued an item to notify the scheduler that the dequeue side is probably unblocked, and in the blockingDequeue method after we have dequeued an item to notify the scheduler that the enqueue side is probably unblocked. Those two methods in CoQueue now look like this:
    def blockingEnqueue(x:A):Unit @suspendable = {
        enqueueBlocker.waitUntilNotBlocked
        enqueue(x)
        scheduler.unblocked(dequeueBlocker)
    }

    def blockingDequeue():A @suspendable = {
        dequeueBlocker.waitUntilNotBlocked
        val x = dequeue
        scheduler.unblocked(enqueueBlocker)
        x
    }
The implementation of unblocked in our default scheduler DefaultCoScheduler is just a call to coNotify, so the behavior of that system will remain the same as it was before we added the calls to unblocked.

Because we need to ensure that all of our NIO read and write operations are handled sequentially, we continue to manage those tasks separately with our NioSelector class, where all of the reads are executed on one thread and all of the writes are executed on another thread.

Scheduler

We already have a scheduler framework that defines a CoScheduler class as the parent class for our scheduler implementations, which requires that we implement the methods setRoutineContinuation, runNextUnblockedRoutine and the newly added unblocked.

In our JavaExecutorCoSchduler, our setRoutineContinuation method is responsible for storing or executing the task. It checks to see if the task is currently blocked, storing it in our list of blocked tasks if so. Otherwise, it passes it to the thread pool (which is managed by an ExecutorService), which takes care of managing the threads and running the task. We define a simple case class, RunnableCont, to turn our task into a Runnable that is usable by the pool.

Our unblocked method gets passed a blocker which is probably now unblocked. We test that, and if in fact it is still blocked we do nothing. If it is unblocked, then we remove it from our list of blocked tasks and pass it to the pool.

The runNextUnblockedRoutine method in this scheduler doesn't actually do anything, since the pool is taking care of running everything. We just return SomeRoutinesBlocked so that the caller goes into a wait state.

In addition to the above three methods, we will have our thread pool, a lock that we use when managing our blocked and runnable tasks, and a set of blocked tasks waiting to become unblocked. For this implementation we choose to use a thread pool of a fixed size, thus the call to Executors.newFixedThreadPool.

Here is our complete JavaExecutorCoScheduler class:
package net.jimmc.scoroutine

import java.lang.Runnable
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService

import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.SynchronizedMap

class JavaExecutorCoScheduler(numWorkers:Int) extends CoScheduler {
    type Task = Option[Unit=>Unit]
    case class RunnableCont(task:Task) extends Runnable {
        def run() = task foreach { _() }
    }

    private val pool = Executors.newFixedThreadPool(numWorkers)
    private val lock = new java.lang.Object
    private val blockedTasks = new LinkedHashMap[Blocker,Task] with
            SynchronizedMap[Blocker,Task]

    private[scoroutine] def setRoutineContinuation(b:Blocker,task:Task) {
        lock.synchronized {
            if (b.isBlocked) {
                blockedTasks(b) = task
            } else {
                pool.execute(RunnableCont(task))
                coNotify
            }
        }
    }

    def unblocked(b:Blocker):Unit = {
        lock.synchronized {
            if (!b.isBlocked)
                blockedTasks.remove(b) foreach { task =>
                    pool.execute(RunnableCont(task)) }
        }
        coNotify
    }

    def runNextUnblockedRoutine():RunStatus = SomeRoutinesBlocked
}

Synchronization

Although not necessitated by the above changes, I added one more change to CoScheduler to improve its synchronization behavior.

While exploring various multi-threading mechanisms as alternatives to using Executor, I wrote a scheduler called MultiThreadCoScheduler in which I implemented my own thread pool and in which the master thread directly allocated tasks to the worker threads in the pool. Although that scheduler was quite a bit larger than the one presented above, it provided much more control over the threads, allowing me to change the number of worker threads on the fly and to be able to tell in my master thread whether there were any running worker threads.

In MultiThreadCoScheduler, the main thread would call coWait to wait until it needed to wake up and hand out another task, and the worker threads would call coNotify when they were done processing a task and were ready to be assigned the next task. Similarly, a call to coNotify would be issued whenever a new task was placed into the task queue.

Unfortunately, Java's wait and notify methods, which are the calls underlying our coWait and coNotify methods, do not quite behave the way we would like. If we compare those calls to the Java NIO select and wakeup calls, we note that if a call is made to wakeup before a call to select, the select call will return immediately. The wait/notify calls do not behave this way; if a call is made to notify when there is no thread waiting in a wait call on that monitor, the notify call does nothing, and the following call to wait will wait until the next call to notify.

This small difference in semantics actually makes a pretty big difference in behavior, because it means when using wait and notify you must be concerned with which happens first. Let's see how that works.

In a typical scenario we have a resource with a boolean state that indicates when a thread can access that resource, for example, a queue with a boolean state of "has some data" that indicates when a reader thread can pull an item from the queue (and perhaps another boolean state of "queue is full" that indicates when a writer thread can put an item into the queue). In the case of MultiThreadCoScheduler we have a task with a "ready" flag that tells us when we can assign that task to a worker, and a worker with an "idle" flag that tells us when we can assign a task to that worker. When a task becomes ready to run, we want a thread (other than the master, since it may be waiting) to add the task to our queue of tasks and then notify the master that a task is available. Meanwhile, when the master is looking for an available task to assign to an idle worker, it will query to see if a task is available, and if not it will then wait until one becomes available. The problem sequence would be if the master checks for available tasks, finds none, then before the master executes its wait, the non-master puts a ready task into the queue and issues a notify to the master. The result of this sequence would be a ready task in the queue, but a master waiting for a notify.

When all of the synchronization is done within a single class, you can ensure that the above problem sequencing of operations does not happen by arranging that the code that places a ready task into the queue and notifies the master happens within one synchronized block, and the code used by the master to query the queue for a ready task and then to wait happens within one synchronized block on the same monitor. But when dealing with subclasses, we run into the "inheritance anomaly" (or "inheritance-synchronization anomaly"). The essence of this problem is that the base class provides a method that is synchronized, but the subclass would like to include more functionality within that synchronized block. If, as is often the case, the subclass does not have access to the monitor being used by the base class to control its synchronization, there is no way for it to do this.

In our case, we can implement something that is sufficient for our current needs by making a small change to our coWait and coNotify methods in CoScheduler so that they behave in the same manner as select and wakeup: if a call to coNotify is made before a call to coWait, the call to coWait will return immediately. We do this by changing the implementation of coWait and coNotify in CoScheduler from this:
    def coWait():Unit = {
        defaultLock.synchronized {
            defaultLock.wait()
        }
    }

    def coNotify():Unit = {
        defaultLock.synchronized {
            defaultLock.notify
        }
    }
to this:
    private var notified = false
    def coWait():Unit = {
        defaultLock.synchronized {
            if (!notified)
                defaultLock.wait()
            notified = false
        }
    }

    def coNotify():Unit = {
        defaultLock.synchronized {
            notified = true
            defaultLock.notify
        }
    }
With the above change to our base class, our subclass no longer needs to be concerned about the problem sequence described above, because the call to coWait will return immediately if there was a call to coNotify since the most recent previous call to coWait.