## Thursday, December 8, 2011

### Levels of Expertise

An attempt to improve the objectivity of skill self-ratings.

### Discussion

We are often asked to rate things on a scale, typically 1 to 5 or 1 to 10. Rarely is there an attempt to define what those different numbers mean. From a statistician's point of view, this makes the values useful for the sole purpose of comparing a single individual's ratings against other ratings of that individual. In particular, without a good definition of what the various levels mean, I don't see how there can be any effective communication from one person to another of the meaning of such a rating.

When my doctor asks me to tell him how much something hurts on a scale of 1 to 10, I have no idea what information he expects to get when I say "3" or "7".

I once asked an acquaintance to rate, on a scale of 1 (bad) to 10 (good), a movie he had just seen. He said it was a 9. I was suspicious of this answer, so I asked him how he would rate Star Wars, which I knew to be his all-time favorite movie, on the same 1-to-10 scale. He said 12.

I personally consider it an aspect of innumeracy, but people often try to emphasize something by using numbers that are outside of the valid range. We may chuckle when Nigel says he likes his amp better because it goes to 11, but how often have you heard someone talking in all seriousness about putting in a "110% effort"? What does that actually mean? How would you know if someone were putting in 110% versus 100%? If 110% is a valid number, then presumably so is 120%, so anyone suggesting a mere 110% is clearly not asking for enough effort.

People tend to overestimate how good they are at all sorts of things, including cognitive, social and physical skills. If we all overrate ourselves by the same amount, I suppose that could all cancel out and you could still compare people's ratings - but without knowing a priori what their ratings should be, we don't know how much they might be overrating themselves.

When people consider their own expertise, it is common for those with less expertise to overvalue themselves more than people with more expertise. With more expertise comes more awareness of what one could do better. Einstein said, "As our circle of knowledge expands, so does the circumference of darkness surrounding it." Relative beginners easily fall into the Sophomore Illusion of thinking they know a lot because the circumference of their knowledge is not yet large enough for them to recognize the size of the surrounding darkness.

In 1989, psychologist John Hayes at Carnegie Mellon University identified what is now called the "ten-year rule" (although there are earlier commenters, including Herbert Simon, who was also at CMU). As Leonard Mlodinow says in "The Drunkard's Walk", "Experts often speak of the 'ten-year rule,' meaning that it takes at least a decade of hard work, patience and striving to become highly successful in most endeavors." (links mine) The ten-year rule is related to the idea that it takes about 10,000 hours of practice at something to become an expert; with 5 hours of practice per business day and 200 business days per year, it would take ten years to rack up that many hours. If you find yourself thinking how wonderfully expert you are in something that you have practiced for only a few years, perhaps you should consider the ten-year rule and temper your evaluation.

Given that people are so bad at these ratings, it seems to me that the only way to get any useful information from someone when asking this kind of self-rating question is to have an objective definition of what each level means.

One way to think about a scale is by how many people fall into each level. There are currently 7 billion people in the world, or almost 10 to the 10th power. This conveniently maps to a logarithmic scale from 0 to 10, allowing us to define eleven levels starting with level 0 containing all approximately 10 billion people in the world and with each higher level having one tenth the number of people as the level just below it. If the descriptions of a level are hard to interpret, perhaps the size of that level will help give an indication of whether a person should be rated there.

Years ago, during a job interview, I was asked to rate my level of expertise in various subjects, such as programming languages and development tools. This was not an unusual question, I had been asked this question before and have been asked it since. What was different that time was that the interviewer included a scale with some relatively objective descriptions for determining level of expertise. I rather liked the scale, so although I don't recall the exact definition of his levels, I have tried to reproduce that concept here, using descriptions somewhat similar to those given by that interviewer. Unfortunately, I don't remember who introduced that scale to me, so I am unable to give credit.

There are many reasons one might want a scale of expertise, including rating potential employees or creating a summary of the amount of expertise within a company. The scale I present here is intended to be very general; given its logarithmic nature that can include the entire world population, it is capable of allowing comparison of expertise across everyone in the world. You might think that would make it suboptimal for rating (potential) employee expertise, but I think there are enough levels to make it useful for that purpose.

### Scale

The scale below includes the following columns:
• Level: a number for the level, from 0 to 10, with 10 being the highest level of expertise.
• Name: a name for the level. These are taken from a set of expertise level names proposed by the Traveling School of Life. My use of them probably doesn't quite match their intent, but I liked the names and thought the ten words matched my levels pretty well, so I applied them to my levels and added "ignorant" for level 0.
• Description: a brief description of the level. The descriptions are worded as if for a technical tool; for application to other areas or concepts, modify accordingly. Comments referring to companies assume a large company (10,000+ people) with large divisions (1000+ people); being a company-wide guru in a company with 100 people might not get you past level 6.
• Size: the approximate number of people expected to be at that level worldwide. As mentioned above, this is a simple logarithmic scale. The number of people in a level is 1010-L where L is the level number.
• Practice: the approximate amount of practice that could be required to reach that level of expertise. Putting in that many hours does not guarantee reaching that level, and reaching that level does not necessarily require putting in that many hours. The conversion factors are 1,000 hours per year or 5 hours per day.
All of these different factors are rough estimates, not intended as absolutes but merely as guidelines to help people rank themselves in a way that allows for more meaningful results. I don't have any research to show how well my guesses about Description, Size and Practice correlate; if anyone knows of something along those lines, that would be interesting.

Level Name Description Size Practice
0 ignorant I have never heard of it. 10,000,000,000 none
1 interested I have heard a little about it, but don't know much. 1,000,000,000 1 hour
2 pursuing I have read an article or two about it and understand the basics of what it is, but nothing in depth. 100,000,000 1 day (5 hours)
3 beginner I have read an in-depth article, primer, or how-to book, and/or have played with it a bit. 10,000,000 1 week (25 hours)
4 apprentice I have used it for at least a few months and have successfully completed a small project using it. 1,000,000 3 months (250 hours)
5 intermediate I have used it for a year or more on a daily or regular basis, and am comfortable using it in moderately complex projects. 100,000 1 year (1,000 hours)
6 advanced I have been using it for many years, know all of the basic aspects, and am comfortable using it as a key element in complex projects. People in my group come to me with their questions. 10,000 5 years (5,000 hours)
7 accomplished I am a local expert, with ten or more years of solid experience. People in my division come to me with their questions. 1,000 10 years (10,000 hours)
8 master I am a company-wide guru with twenty or more years of experience; people from other divisions come to me with their questions. 100 20 years (20,000 hours)
9 grandmaster I am a recognized international authority on it. 10 30 years (30,000 hours)
10 great-grandmaster I created it, and am the number 1 expert in the world. 1 50 years (50,000 hours)

### References

Other scales of expertise:
Other articles:

## Thursday, July 28, 2011

### Debugging Scala Parser Combinators

Two simple mechanisms for debugging parsers written using Scala's parser combinators.

### Introduction

In a recent comment on my 2008 blog post about Scala's parser combinators, a reader asked how one might go about debugging such a parser. As one post says, "Debugging a parser implemented with the help of a combinator library has its special challenges." You may have trouble setting breakpoints, and stack traces can be difficult to interpret.

The two techniques I show here may not provide you with the kind of visibility you might be used to when single-stepping through problem code, but I hope they provide at least a little more visibility than you might otherwise have.

### Example Parser

As an example parser I will use an integer-only version of the four-function arithmetic parser I built for my 2008 parser combinator post. The code consists of a set of case classes to represent the parsed results and a parser class that contains the parsing rules and a few helper methods. You can copy this code into a file and either compile it or load it into the Scala REPL.

```import scala.util.parsing.combinator.syntactical.StandardTokenParsers

sealed abstract class Expr {
def eval():Int
}

case class EConst(value:Int) extends Expr {
def eval():Int = value
}

case class EAdd(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval + right.eval
}

case class ESub(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval - right.eval
}

case class EMul(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval * right.eval
}

case class EDiv(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval / right.eval
}

case class EUMinus(e:Expr) extends Expr {
def eval():Int = -e.eval
}

object ExprParser extends StandardTokenParsers {
lexical.delimiters ++= List("+","-","*","/","(",")")

def value = numericLit ^^ { s => EConst(s.toInt) }

def parens:Parser[Expr] = "(" ~> expr <~ ")"

def unaryMinus:Parser[EUMinus] = "-" ~> term ^^ { EUMinus(_) }

def term = ( value |  parens | unaryMinus )

def binaryOp(level:Int):Parser[((Expr,Expr)=>Expr)] = {
level match {
case 1 =>
"+" ^^^ { (a:Expr, b:Expr) => EAdd(a,b) } |
"-" ^^^ { (a:Expr, b:Expr) => ESub(a,b) }
case 2 =>
"*" ^^^ { (a:Expr, b:Expr) => EMul(a,b) } |
"/" ^^^ { (a:Expr, b:Expr) => EDiv(a,b) }
case _ => throw new RuntimeException("bad precedence level "+level)
}
}
val minPrec = 1
val maxPrec = 2

def binary(level:Int):Parser[Expr] =
if (level>maxPrec) term
else binary(level+1) * binaryOp(level)

def expr = ( binary(minPrec) | term )

def parse(s:String) = {
val tokens = new lexical.Scanner(s)
phrase(expr)(tokens)
}

def apply(s:String):Expr = {
parse(s) match {
case Success(tree, _) => tree
case e: NoSuccess =>
}
}

def test(exprstr: String) = {
parse(exprstr) match {
case Success(tree, _) =>
println("Tree: "+tree)
val v = tree.eval()
println("Eval: "+v)
case e: NoSuccess => Console.err.println(e)
}
}

//A main method for testing
def main(args: Array[String]) = test(args(0))
}
```
In the `ExprParser` class, the lines up to and including the definition of the `expr` method define the parsing rules, whereas the methods from `parse` onwards are helper methods.

### Calling Individual Parsers

In our example parser we can easily ask it to parse a string by calling our `ExprParser.test` method, which parses the string using our `parse` method, prints the resulting parse, and (if the parse was successful) evaluates the parse tree and prints that value.

The last line of `parse` parses a string using our expression parser:
```phrase(expr)(tokens)
```
`phrase` is a method in `StandardTokenParsers` that parses an input stream using the specified parser. The only thing special about our `expr` method is that we happen to have selected it as our top-level parser - but we could just as easily have picked one of our other parsers as our top-level parser.

Let's add another version of the `test` method that lets us specify which parser to use as the top-level parser. We want to print out the results in the same way as for the existing `test` method, so we first refactor that existing method:
```def test(exprstr: String) =
printParseResult(parse(exprstr))

def printParseResult(pr:ParseResult[Expr]) = {
pr match {
case Success(tree, _) =>
println("Tree: "+tree)
val v = tree.eval()
println("Eval: "+v)
case e: NoSuccess => Console.err.println(e)
}
}
```
Now we add a new `parse` method that accepts a parser as an argument, and we call that from our new `test` method:
```def parse(p:Parser[Expr], s:String) = {
val tokens = new lexical.Scanner(s)
phrase(p)(tokens)
}

def test(p:Parser[Expr], exprstr: String) =
printParseResult(parse(p,exprstr))
```
We can run the Scala REPL, load our modified file using the ":load" command, then manually call the top-level parser by calling our `test` method. To reduce typing, we import everything from `ExprParser`. In the examples below, text in bold is what we type, the rest is printed by the REPL.
```scala> import ExprParser._
import ExprParser._

scala> test("1+2")
Eval: 3

scala> test("1+2*3")
Eval: 7

scala> test("(1+2)*3")
Eval: 9
```
We can also call the `test` method that takes a parser as an argument, allowing us to specifically test one particular parsing rule at a time. If we pass in `expr` as the parser, we will get the same results as above; but if we pass in a different parser, we may get different results.
```scala> test(expr,"1+2*3")
Eval: 7

scala> test(binary(1),"1+2*3")
Eval: 7

scala> test(binary(2),"1+2*3")
[1.2] failure: ``/'' expected but `+' found

1+2*3
^

scala> test(parens,"1+2")
[1.1] failure: ``('' expected but 1 found

1+2
^

scala> test(parens,"(1+2)")
Eval: 3

scala> test(parens,"(1+2)*3")
[1.6] failure: end of input expected

(1+2)*3
^
```

### Tracing

If you have a larger parser that is not behaving and you are not quite sure where the problem lies, it can be tedious to directly call individual parsers until you find which one is misbehaving. Being able to trace the progress of the whole parser running on an input known to cause the problem might be helpful, but sprinkling `println` statements throughout your parser can be tricky. This section provides an approach that allows you to do some tracing with minimal changes to your code. The output can get pretty verbose, but at least this will give you a starting point from which you may be able to devise your own improved debugging.

The idea behind this approach is to wrap some or all of the individual parsers in a debugging parser that delegates its `apply` action to the wrapper parser, but that prints out some debugging information. The `apply` action is called during the act of parsing.

Note: this code relies on the fact that the code for the various combinators in the `Parser` class in Scala's `StandardTokenParsers` (which is implemented as an inner class in `scala.util.parsing.combinator.Parsers`) does not override any `Parser` method other than `apply`.

This code could be added directly to the `ExprParser` class, but it is presented here as a separate class to make it easier to reuse. Add this `DebugStandardTokenParsers` class to the file containing `ExprParsers`.
```trait DebugStandardTokenParsers extends StandardTokenParsers {
class Wrap[+T](name:String,parser:Parser[T]) extends Parser[T] {
def apply(in: Input): ParseResult[T] = {
val first = in.first
val pos = in.pos
val offset = in.offset
val t = parser.apply(in)
println(name+".apply for token "+first+
" at position "+pos+" offset "+offset+" returns "+t)
t
}
}
}
```
The `Wrap` class provides the hook into the `apply` method that we need in order to print out our trace information as the parser runs. Once this class is in place, we modify `ExprParser` to inherit from it rather than from `StandardTokenParsers`:
```object ExprParser extends DebugStandardTokenParsers { ... }
```
So far we have not changed the behavior of the parser, since we have not yet wired in the `Wrap` class. To do so, we can take any of the existing parsers and wrap it in a `new Wrap`. For example, with the top-level `expr` parser we could do this, with the added code highlighted in bold:
```def expr = new Wrap("expr", ( binary(minPrec) | term ) )
```
We can make this a bit easier to edit and read by using implicits. In `DebugStandardTokenParsers` we add this method:
```implicit def toWrapped(name:String) = new {
def !!![T](p:Parser[T]) = new Wrap(name,p)
}
```
Now we can wrap our `expr` method like this:
```def expr = "expr" !!! ( binary(minPrec) | term )
```
If you don't like using `!!!` as an operator, you are free to pick something more to your taste, or you can leave out the implicit and just use the `new Wrap` approach.

At this point you must modify your source code by adding the above syntax to each parsing rule that you want to trace. You can go through and do them all, or you can just pick out the ones you think are the most likely culprits and wrap those. Note that you can wrap any parser this way, including those that appear as pieces in the middle of other parsers. The following example shows how some of the parsers in the `term` and `binaryOp` methods can be wrapped:
```    def term = "term" !!! ( value |  "term-parens" !!! parens | unaryMinus )

def binaryOp(level:Int):Parser[((Expr,Expr)=>Expr)] = {
level match {
case 1 =>
"sub" !!! "-" ^^^ { (a:Expr, b:Expr) => ESub(a,b) }
case 2 =>
"mul" !!! "*" ^^^ { (a:Expr, b:Expr) => EMul(a,b) } |
"div" !!! "/" ^^^ { (a:Expr, b:Expr) => EDiv(a,b) }
case _ => throw new RuntimeException("bad precedence level "+level)
}
}

```
Assuming we have wrapped the `expr`, `term` and `binaryOp` methods as in the above examples, here is what the output looks like for a few tests. As in the previous REPL example, user input is in bold. If you are using the REPL and reload the file, remember to run `import ExprParser._` again to pick up the newer definitions.
```scala> test("1")
term.apply for token 1 at position 1.1 offset 0 returns [1.2] parsed: EConst(1)
plus.apply for token EOF at position 1.2 offset 1 returns [1.2] failure: ``+'' expected but EOF found

1
^
minus.apply for token EOF at position 1.2 offset 1 returns [1.2] failure: ``-'' expected but EOF found

1
^
expr.apply for token 1 at position 1.1 offset 0 returns [1.2] parsed: EConst(1)
Tree: EConst(1)
Eval: 1

scala> test("(1+2)*3")
term.apply for token 1 at position 1.2 offset 1 returns [1.3] parsed: EConst(1)
plus.apply for token `+' at position 1.3 offset 2 returns [1.4] parsed: +
term.apply for token 2 at position 1.4 offset 3 returns [1.5] parsed: EConst(2)
plus.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``+'' expected but `)' found

(1+2)*3
^
minus.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``-'' expected but `)' found

(1+2)*3
^
expr.apply for token 1 at position 1.2 offset 1 returns [1.5] parsed: EAdd(EConst(1),EConst(2))
term-parens.apply for token `(' at position 1.1 offset 0 returns [1.6] parsed: EAdd(EConst(1),EConst(2))
term.apply for token `(' at position 1.1 offset 0 returns [1.6] parsed: EAdd(EConst(1),EConst(2))
term.apply for token 3 at position 1.7 offset 6 returns [1.8] parsed: EConst(3)
plus.apply for token EOF at position 1.8 offset 7 returns [1.8] failure: ``+'' expected but EOF found

(1+2)*3
^
minus.apply for token EOF at position 1.8 offset 7 returns [1.8] failure: ``-'' expected but EOF found

(1+2)*3
^
expr.apply for token `(' at position 1.1 offset 0 returns [1.8] parsed: EMul(EAdd(EConst(1),EConst(2)),EConst(3))
Eval: 9

scala> test(parens,"(1+2)")
term.apply for token 1 at position 1.2 offset 1 returns [1.3] parsed: EConst(1)
mul.apply for token `+' at position 1.3 offset 2 returns [1.3] failure: ``*'' expected but `+' found

(1+2)
^
div.apply for token `+' at position 1.3 offset 2 returns [1.3] failure: ``/'' expected but `+' found

(1+2)
^
add.apply for token `+' at position 1.3 offset 2 returns [1.4] parsed: +
term.apply for token 2 at position 1.4 offset 3 returns [1.5] parsed: EConst(2)
mul.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``*'' expected but `)' found

(1+2)
^
div.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``/'' expected but `)' found

(1+2)
^
add.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``+'' expected but `)' found

(1+2)
^
sub.apply for token `)' at position 1.5 offset 4 returns [1.5] failure: ``-'' expected but `)' found

(1+2)
^
expr.apply for token 1 at position 1.2 offset 1 returns [1.5] parsed: EAdd(EConst(1),EConst(2))
Eval: 3
```
As you can see, even for these very short input strings the output is pretty verbose. It does, however, show you what token it is trying to parse and where in the input stream that token is, so by paying attention to the position and offset numbers you can see where it is backtracking.

When you have found the problem and are done debugging, you can remove the `DebugStandardTokenParsers` class and take out all of the `!!!` wrapping operations, or you can leave everything in place and disable the wrapper output by changing the definition of the implicit `!!!` operator to this:
```def !!![T](p:Parser[T]) = p
```
Or, if you want to make it possible to enable debugging output later, change `!!!` to return either `p` or `new Wrap(p)` depending on some debugging configuration value.

### Updated Example

Below is the complete program with all of the above changes.
```import scala.util.parsing.combinator.syntactical.StandardTokenParsers

sealed abstract class Expr {
def eval():Int
}

case class EConst(value:Int) extends Expr {
def eval():Int = value
}

case class EAdd(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval + right.eval
}

case class ESub(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval - right.eval
}

case class EMul(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval * right.eval
}

case class EDiv(left:Expr, right:Expr) extends Expr {
def eval():Int = left.eval / right.eval
}

case class EUMinus(e:Expr) extends Expr {
def eval():Int = -e.eval
}

trait DebugStandardTokenParsers extends StandardTokenParsers {
class Wrap[+T](name:String,parser:Parser[T]) extends Parser[T] {
def apply(in: Input): ParseResult[T] = {
val first = in.first
val pos = in.pos
val offset = in.offset
val t = parser.apply(in)
println(name+".apply for token "+first+
" at position "+pos+" offset "+offset+" returns "+t)
t
}
}

implicit def toWrapped(name:String) = new {
def !!![T](p:Parser[T]) = new Wrap(name,p) //for debugging
//def !!![T](p:Parser[T]) = p              //for production
}
}

object ExprParser extends DebugStandardTokenParsers {
lexical.delimiters ++= List("+","-","*","/","(",")")

def value = numericLit ^^ { s => EConst(s.toInt) }

def parens:Parser[Expr] = "(" ~> expr <~ ")"

def unaryMinus:Parser[EUMinus] = "-" ~> term ^^ { EUMinus(_) }

def term = "term" !!! ( value |  "term-parens" !!! parens | unaryMinus )

def binaryOp(level:Int):Parser[((Expr,Expr)=>Expr)] = {
level match {
case 1 =>
"sub" !!! "-" ^^^ { (a:Expr, b:Expr) => ESub(a,b) }
case 2 =>
"mul" !!! "*" ^^^ { (a:Expr, b:Expr) => EMul(a,b) } |
"div" !!! "/" ^^^ { (a:Expr, b:Expr) => EDiv(a,b) }
case _ => throw new RuntimeException("bad precedence level "+level)
}
}
val minPrec = 1
val maxPrec = 2

def binary(level:Int):Parser[Expr] =
if (level>maxPrec) term
else binary(level+1) * binaryOp(level)

def expr = "expr" !!! ( binary(minPrec) | term )

def parse(s:String) = {
val tokens = new lexical.Scanner(s)
phrase(expr)(tokens)
}

def parse(p:Parser[Expr], s:String) = {
val tokens = new lexical.Scanner(s)
phrase(p)(tokens)
}

def apply(s:String):Expr = {
parse(s) match {
case Success(tree, _) => tree
case e: NoSuccess =>
}
}

def test(exprstr: String) =
printParseResult(parse(exprstr))

def test(p:Parser[Expr], exprstr: String) =
printParseResult(parse(p,exprstr))

def printParseResult(pr:ParseResult[Expr]) = {
pr match {
case Success(tree, _) =>
println("Tree: "+tree)
val v = tree.eval()
println("Eval: "+v)
case e: NoSuccess => Console.err.println(e)
}
}

//A main method for testing
def main(args: Array[String]) = test(args(0))
}
```

## Tuesday, July 19, 2011

A scheduler that uses multiple worker threads for continuations-based Scala coroutines.

In my recent series of posts that ended with a complete Scala server that uses continuations-based coroutines to store per-client state, I asserted that the single-threaded scheduler implementation in that example could relatively easily be replaced by a scheduler that uses multiple threads. In this post I provide a simple working example of such a multithread scheduler.

### Overview

We can use the standard thread-pool approach in which we have a pool of worker threads that independently pull from a common task queue. Java 1.5 introduced a set of classes and interfaces in the `java.util.concurrent` package to support various kinds of thread pools or potentially other task scheduling mechanisms. Rather than writing our own, we will use an `Executor` from that package.

We have an additional requirement that makes our situation a little bit more complex than the typical thread-pool: our collection of tasks includes both tasks that are ready to run and tasks that are currently blocked but will become ready to run at some point in the future.

We will implement a new scheduler class `JavaExecutorCoScheduler` that maintains a list of blocked tasks and uses a Java `Executor` to manage runnable tasks.

The updated complete source code for this post is available on github in my nioserver project under the tag blog-executor.

As mentioned above, we need to deal with two kinds of tasks: tasks that are ready to run and tasks that are blocked. The standard `Executor` class allows us to submit a task for execution, but does not handle blocked tasks. Since we don't want to submit blocked tasks to the `Executor`, we have to queue them up ourselves. We have two issues to attend to:
1. When our scheduler is passed a task, we must put it into our own queue of blocked tasks if it is not currently ready to run.
2. When a previously blocked task becomes ready to run, we must remove it from our queue of blocked tasks and pass it to the `Executor`.
The first issue is straightforward, as our framework already allows us to test the blocker for a task and see if the task is ready to run. In order to properly take care of the second issue, we will make a small change to our framework to allow us to notice when a blocker has probably stopped blocking so that we can run the corresponding task. We do this by modifying our `CoScheduler` class to add a method to notify it that a blocker has probably become unblocked:
```    def unblocked(b:Blocker):Unit
```
We call this method from `CoQueue` in the two places where we previously called `scheduler.coNotify`: in the `blockingEnqueue` method after we have enqueued an item to notify the scheduler that the dequeue side is probably unblocked, and in the `blockingDequeue` method after we have dequeued an item to notify the scheduler that the enqueue side is probably unblocked. Those two methods in `CoQueue` now look like this:
```    def blockingEnqueue(x:A):Unit @suspendable = {
enqueueBlocker.waitUntilNotBlocked
enqueue(x)
scheduler.unblocked(dequeueBlocker)
}

def blockingDequeue():A @suspendable = {
dequeueBlocker.waitUntilNotBlocked
val x = dequeue
scheduler.unblocked(enqueueBlocker)
x
}
```
The implementation of `unblocked` in our default scheduler `DefaultCoScheduler` is just a call to `coNotify`, so the behavior of that system will remain the same as it was before we added the calls to `unblocked`.

Because we need to ensure that all of our NIO read and write operations are handled sequentially, we continue to manage those tasks separately with our `NioSelector` class, where all of the reads are executed on one thread and all of the writes are executed on another thread.

### Scheduler

We already have a scheduler framework that defines a `CoScheduler` class as the parent class for our scheduler implementations, which requires that we implement the methods `setRoutineContinuation`, `runNextUnblockedRoutine` and the newly added `unblocked`.

In our `JavaExecutorCoSchduler`, our `setRoutineContinuation` method is responsible for storing or executing the task. It checks to see if the task is currently blocked, storing it in our list of blocked tasks if so. Otherwise, it passes it to the thread pool (which is managed by an `ExecutorService`), which takes care of managing the threads and running the task. We define a simple case class, `RunnableCont`, to turn our task into a `Runnable` that is usable by the pool.

Our `unblocked` method gets passed a blocker which is probably now unblocked. We test that, and if in fact it is still blocked we do nothing. If it is unblocked, then we remove it from our list of blocked tasks and pass it to the pool.

The `runNextUnblockedRoutine` method in this scheduler doesn't actually do anything, since the pool is taking care of running everything. We just return `SomeRoutinesBlocked` so that the caller goes into a wait state.

In addition to the above three methods, we will have our thread pool, a lock that we use when managing our blocked and runnable tasks, and a set of blocked tasks waiting to become unblocked. For this implementation we choose to use a thread pool of a fixed size, thus the call to `Executors.newFixedThreadPool`.

Here is our complete `JavaExecutorCoScheduler` class:
```package net.jimmc.scoroutine

import java.lang.Runnable
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService

import scala.collection.mutable.SynchronizedMap

class JavaExecutorCoScheduler(numWorkers:Int) extends CoScheduler {
def run() = task foreach { _() }
}

private val lock = new java.lang.Object

lock.synchronized {
if (b.isBlocked) {
} else {
coNotify
}
}
}

def unblocked(b:Blocker):Unit = {
lock.synchronized {
if (!b.isBlocked)
}
coNotify
}

def runNextUnblockedRoutine():RunStatus = SomeRoutinesBlocked
}
```

### Synchronization

Although not necessitated by the above changes, I added one more change to `CoScheduler` to improve its synchronization behavior.

While exploring various multi-threading mechanisms as alternatives to using `Executor`, I wrote a scheduler called `MultiThreadCoScheduler` in which I implemented my own thread pool and in which the master thread directly allocated tasks to the worker threads in the pool. Although that scheduler was quite a bit larger than the one presented above, it provided much more control over the threads, allowing me to change the number of worker threads on the fly and to be able to tell in my master thread whether there were any running worker threads.

In `MultiThreadCoScheduler`, the main thread would call `coWait` to wait until it needed to wake up and hand out another task, and the worker threads would call `coNotify` when they were done processing a task and were ready to be assigned the next task. Similarly, a call to `coNotify` would be issued whenever a new task was placed into the task queue.

Unfortunately, Java's `wait` and `notify` methods, which are the calls underlying our `coWait` and `coNotify` methods, do not quite behave the way we would like. If we compare those calls to the Java NIO `select` and `wakeup` calls, we note that if a call is made to `wakeup` before a call to `select`, the `select` call will return immediately. The `wait`/`notify` calls do not behave this way; if a call is made to `notify` when there is no thread waiting in a `wait` call on that monitor, the `notify` call does nothing, and the following call to `wait` will wait until the next call to `notify`.

This small difference in semantics actually makes a pretty big difference in behavior, because it means when using `wait` and `notify` you must be concerned with which happens first. Let's see how that works.

In a typical scenario we have a resource with a boolean state that indicates when a thread can access that resource, for example, a queue with a boolean state of "has some data" that indicates when a reader thread can pull an item from the queue (and perhaps another boolean state of "queue is full" that indicates when a writer thread can put an item into the queue). In the case of `MultiThreadCoScheduler` we have a task with a "ready" flag that tells us when we can assign that task to a worker, and a worker with an "idle" flag that tells us when we can assign a task to that worker. When a task becomes ready to run, we want a thread (other than the master, since it may be waiting) to add the task to our queue of tasks and then notify the master that a task is available. Meanwhile, when the master is looking for an available task to assign to an idle worker, it will query to see if a task is available, and if not it will then wait until one becomes available. The problem sequence would be if the master checks for available tasks, finds none, then before the master executes its wait, the non-master puts a ready task into the queue and issues a notify to the master. The result of this sequence would be a ready task in the queue, but a master waiting for a notify.

When all of the synchronization is done within a single class, you can ensure that the above problem sequencing of operations does not happen by arranging that the code that places a ready task into the queue and notifies the master happens within one `synchronized` block, and the code used by the master to query the queue for a ready task and then to wait happens within one `synchronized` block on the same monitor. But when dealing with subclasses, we run into the "inheritance anomaly" (or "inheritance-synchronization anomaly"). The essence of this problem is that the base class provides a method that is synchronized, but the subclass would like to include more functionality within that synchronized block. If, as is often the case, the subclass does not have access to the monitor being used by the base class to control its synchronization, there is no way for it to do this.

In our case, we can implement something that is sufficient for our current needs by making a small change to our `coWait` and `coNotify` methods in `CoScheduler` so that they behave in the same manner as `select` and `wakeup`: if a call to `coNotify` is made before a call to `coWait`, the call to `coWait` will return immediately. We do this by changing the implementation of `coWait` and `coNotify` in `CoScheduler` from this:
```    def coWait():Unit = {
defaultLock.synchronized {
defaultLock.wait()
}
}

def coNotify():Unit = {
defaultLock.synchronized {
defaultLock.notify
}
}
```
to this:
```    private var notified = false
def coWait():Unit = {
defaultLock.synchronized {
if (!notified)
defaultLock.wait()
notified = false
}
}

def coNotify():Unit = {
defaultLock.synchronized {
notified = true
defaultLock.notify
}
}
```
With the above change to our base class, our subclass no longer needs to be concerned about the problem sequence described above, because the call to `coWait` will return immediately if there was a call to `coNotify` since the most recent previous call to `coWait`.

## Saturday, June 25, 2011

### Sledgehammer Words

Words are tools that we use to clarify our concepts, express our emotions and persuade others to our positions. We use those tools to craft mental models which we deliver to our listener. The better the job we do with those tools, the more effectively we can communicate our message.

The words we use every day are our basic tools. Like screwdrivers and pliers, these words are simple but versatile, performing adequately for most tasks. Occasionally we might want to use a more esoteric word for a specific task, as we might pull out a pair of bent needle nose pliers when that tool is just right for the job.

The better your selection of tools, the better job you can do at making a beautiful and effective work. In a pinch you can use a slot-head screwdriver to set a Phillips screw, but you stand a higher chance of damaging the screw head and it is more difficult to set it just right. Similarly but more subtly, you may be able to use a Phillips screwdriver to set a Frearson screw, but you will be able to do a better job if you have a Frearson driver. Most of us will probably not need this level of distinction and can get by with just a Phillips, or indeed perhaps with just a slot-head driver, but if you want to be able to craft the best results over the widest range of projects, having that Frearson screwdriver in your toolbox will provide one more area in which you can do things better.

Swear words are the sledgehammers of our verbal toolbox. Like a sledgehammer, a swear word can pack a lot of punch, and like a sledgehammer it lacks precision. Sometimes a sledgehammer is the right tool for the job: when you need to smash a hole in something, one good whack with a sledgehammer can be far more effective than trying to use pliers and screwdrivers to do the same thing.

But for most of us, most of the time, that's not the job we are trying to do. Most of the time we are more interested in making a neat hole, and we should pull out the electric drill, or the hole saw, or even the Sawzall to do the job; or we just need to tap in a small nail, where a standard hammer would work nicely. If we smash it with a sledgehammer, it's likely that we will then need to spend a lot of time cleaning things up afterwards, which would probably be more work than using one of the other tools in the first place.

Some people seem to have a very small toolbox and are constantly swinging around that sledgehammer. They use it for almost everything; rather than pulling out a screwdriver to set a screw, they whack it with their sledgehammer. To me, everything these people say seems like a pile of smashed rubble. I doubt that's really the message they want to deliver.

Even a single use of a sledgehammer word can derail any kind of nuance or subtlety, and casual use will likely overwhelm everything else in the message.

So go ahead and use a sledgehammer when it is appropriate, but do so deliberately and fully conscious of your intended result. Make an effort to add a good assortment of tools to your toolbox, understand what you are trying to accomplish, learn to use the best tool for the job and use it well.

## Friday, April 15, 2011

### Java Nio Complete Scala Server

The capstone to this series of posts: a complete multi-client stateful application server in Scala using Java NIO non-blocking IO for both reading and writing, and delimited continuations as coroutines for both IO and application processing.

### Background

In the initial post of this series on Java NIO in Scala I mentioned a set of Limitations of the first example server. In the next three posts after that initial post I addressed some of those limitations. In this post I address the remaining limitation in that original list: the application code (an echo loop in the example) is buried in the `NioConnection` class, which makes that application code more difficult to maintain and makes the server code not directly reusable as a library.

With the changes described in the next section, all of the application-specific behavior will be encapsulated in an instance of an application-specific subclass of a new class, `NioApplication`. Since the remainder of the classes presented so far will now be independent of the application and reusable without any modifications for multiple applications, they will be moved into a separate package, `net.jimmc.nio`.

Other than adding `package net.jimmc.nio`, there were no changes to `LineDecoder` and `NioSelector`, and there were no changes to the coroutine package `net.jimmc.scoroutine` for this latest set of changes. For the files that were changed, listed below, the listings show the complete new version of the file, with changes from the previous version highlighted in bold.

The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-complete.

### NioApplication

Extracting the application-specific code out of `NioConnection` is pretty simple: in `NioConnection.startApp`, rather than starting up a built-in echo loop, we add a hook that allows us to call back to an application-specific method that implements whatever behavior the application wants for dealing with a connection. To do this, we define a new abstract class `NioApplication` that includes a `runConnection` method that we can call from `NioConnection.startApp`.

We will also use the `NioApplication` class as a convenience class where we can bundle up some of the arguments that get passed around a lot, in particular the coroutine scheduler and the read and write selectors. This gives us the opportunity to override the coroutine scheduler with one more appropriate for the application, although we will not do so in this example.

```package net.jimmc.nio

import net.jimmc.scoroutine.DefaultCoScheduler

import scala.util.continuations._

abstract class NioApplication {
val writeSelector = new NioSelector()
val sched = new DefaultCoScheduler

def runConnection(conn:NioConnection):Unit @suspendable
}
```

### NioServer

We simplify the `NioServer` class by removing `object NioServer`, which will instead be in the application main object. We replace three parameters in the constructor with the single `app` parameter and likewise replace three arguments in the call to `NioListener` with the single `app` argument.
```package net.jimmc.nio

import net.jimmc.scoroutine.DefaultCoScheduler

val listener = new NioListener(app, hostAddr, port)

def start() {
listener.start(true)
//run the NIO read and write selectors each on its own thread
app.sched.run    //run the coroutine scheduler on our thread, renamed
}
}
```

### NioListener

Three parameters in the constructor have been replaced by the single `app` parameter.
```package net.jimmc.nio

import net.jimmc.scoroutine.CoScheduler

import java.nio.channels.{ServerSocketChannel,SocketChannel}
import java.nio.channels.SelectionKey
import scala.util.continuations._

val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false);
serverChannel.socket.bind(isa)

def start(continueListening: =>Boolean):Unit = {
reset {
while (continueListening) {
val socket = accept()
NioConnection.newConnection(app, socket)
}
}
}

private def accept():SocketChannel @suspendable = {
shift { k =>
val conn = serverChannel.accept()
conn.configureBlocking(false)
k(conn)
})
}
}
}
```

### NioConnection

We modify the constructor and the companion to replace three parameters with the single `app` parameter, and we replace our echo loop in `startApp` with a call to the application `runConnection` method, followed by a call to our `close` method to make sure we close the socket when the application is done with it.
```package net.jimmc.nio

import net.jimmc.scoroutine.{CoQueue,CoScheduler}

import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

object NioConnection {
def newConnection(app:NioApplication, socket:SocketChannel) {
val conn = new NioConnection(app, socket)
conn.start()
}
}

class NioConnection(app:NioApplication, socket:SocketChannel) {

private val buffer = ByteBuffer.allocateDirect(2000)
private val lineDecoder = new LineDecoder
private val inQ = new CoQueue[String](app.sched, 10)
private val outQ = new CoQueue[String](app.sched, 10)

def start():Unit = {
startWriter
startApp
}

private def startApp() {
reset {
app.runConnection(this)
close()
}
}

reset {
while (socket.isOpen)
}
}

private def readWait:Unit @suspendable = {
buffer.clear()
if (count<1) {
socket.close()
shiftUnit[Unit,Unit,Unit]()
} else {
buffer.flip()
lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
}
}

private def read(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1  //indicate EOF
else shift { k =>
k(n)
})
}
}

private def startWriter() {
reset {
while (socket.isOpen)
writeWait
}
}

private def write(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1  //indicate EOF
else shift { k =>
app.writeSelector.register(socket, SelectionKey.OP_WRITE, {
val n = socket.write(b)
k(n)
})
}
}

private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
write(b)
if (b.remaining>0 && socket.isOpen)
writeBuffer(b)
else
shiftUnit[Unit,Unit,Unit]()
}

private def writeWait():Unit @suspendable = {
val str = outQ.blockingDequeue
if (str eq closeMarker) {
socket.close
shiftUnit[Unit,Unit,Unit]()
} else
writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
}

def writeLine(s:String) = write(s+"\n")
def write(s:String) = outQ.blockingEnqueue(s)

def isOpen = socket.isOpen
private val closeMarker = new String("")
def close():Unit @suspendable = write(closeMarker)
}
```

### EchoServer

We move the application-specific main object out of `NioServer` and place it into our sample application class, which we call `EchoServer`, along with a subclassed `NioApplication` that provides our application behavior.

Highlighted differences are as compared to the previous version of `NioServer`.
```import net.jimmc.nio.{NioApplication,NioConnection,NioServer}
import net.jimmc.scoroutine.DefaultCoScheduler

import scala.util.continuations._

object EchoServer {
def main(args:Array[String]) {
val app = new EchoApplication
val port = 1234
server.start()
}
}

class EchoApplication extends NioApplication {
def runConnection(conn:NioConnection):Unit @suspendable = {
while (conn.isOpen) {
}
}
}
```
The above class is the complete application definition for our echo server when built on top of our generic nio package. After compiling, run with this command:
```\$ scala EchoServer
```
With all the above changes, we have once again internally transformed our application, but besides starting it up with a different name it's external behavior is still the same. However, we have reached the point where defining a new server-based application is easy.

### ThreeQuestionsServer

The example in this section shows a slightly more complex application that maintains some local per-client state as it progresses through a short series of steps interacting with the client. In this simple application, the server asks up to three questions of the client and collects responses, with each next question sometimes depending on the previous answers. The per-client state is contained both in local variables and in the location of execution within the application. Each time the processing for a client is suspended the state for that client is captured in a continuation to be restored when the next piece of input is available. The continuation includes all of the above per-client state information, so we don't have to write any application-specific code to save and restore that data.

By defining the `ReaderWriter` interface trait, the application is written so as to be able to run either in server mode using an instance of `ConnReader`, in which case it accepts connections from clients, or in standalone mode using an instance of `SysReader`, in which case it only interacts with the console.

When our application running in server mode finishes handling a client and exits from the `run` method, control returns to `NioConnection`, which closes the connection.
```import net.jimmc.nio.{NioApplication,NioServer,NioConnection}

import scala.util.continuations._

object ThreeQuestionsConsole {
def main(args:Array[String]) {
val out = new PrintWriter(System.out)
reset {
(new ThreeQuestions(io)).run
}
}
}

object ThreeQuestionsServer {
def main(args:Array[String]) {
val app = new ThreeQuestionsApp
val port = 1234
server.start()
}
}

class ThreeQuestionsApp extends NioApplication {
def runConnection(conn:NioConnection):Unit @suspendable = {
(new ThreeQuestions(io)).run
}
}

def writeLine(s:String):Unit @suspendable
}

def writeLine(s:String) = { out.println(s); out.flush() }
}

def writeLine(s:String):Unit @suspendable = conn.writeLine(s)
}

def run():Unit @suspendable = {
val RxArthur = ".*arthur.*".r
val RxLauncelot = ".*(launcelot|lancelot).*".r
val RxRobin = ".*robin.*".r
val RxHolyGrail = ".*seek the holy grail.*".r
val RxSwallow = ".*african or european.*".r
val RxAssyriaCapital =
".*(assur|shubat.enlil|kalhu|calah|nineveh|dur.sharrukin).*".r
val holy = quest match {
case RxHolyGrail() => true
case _ => false
}
if (holy) {
val q3Type = name match {
case RxRobin() => 'capital
case RxArthur() => 'swallow
case _ => 'color
}
val a3 = (q3Type match {
case 'capital => ask("What is the capital of Assyria?")
case 'swallow => ask("What is the air-speed velocity of an unladen swallow?")
}).toLowerCase
(q3Type,a3,name) match {
//Need to use an underscore in regex patterns with alternates
case ('capital,RxAssyriaCapital(_),_) => accept
case ('capital,_,_) => reject
case ('swallow,RxSwallow(),_) => rejectMe
case ('swallow,_,_) => reject
case ('color,"blue",RxLauncelot(_)) => accept
case ('color,_,RxLauncelot(_)) => reject
case ('color,_,_) => accept
}
} else {
reject
}
}

def accept:Unit @suspendable = io.writeLine("You may pass")
def reject:Unit @suspendable = io.writeLine("you: Auuuuuuuugh!")
def rejectMe:Unit @suspendable = io.writeLine("me: Auuuuuuuugh!")
}
```
To run in console or server mode, use one of the following two commands:
```\$ scala ThreeQuestionsConsole
\$ scala ThreeQuestionsServer
```

### Limitations

I am calling this version complete because it addresses all of the issues in the Limitations section of my original post, but it is far from production-ready. Before putting this code into production I would address the following issues.
• Although the application now uses more than one thread, it still runs all of the application code on a single thread. The scheduler should be replaced by one that can choose how many threads to use and distribute the execution of the coroutines among those threads.
• This version still has not addressed all of the issues raised in the Limitations section of the second post in this series, on character decoding. In particular:
• Error handling should be improved.
• It only supports UTF-8 encoding.
For an example of this problem, type a Control-C into your telnet window when connected to the EchoServer application.
• The application should parse its command line arguments so that it has the flexibility to, for example, use a different port number without requiring a code change.
• The application should read a configuration file.
• Error handling in general needs to be improved.

## Friday, April 8, 2011

### Java NIO for Writing

Using Java NIO non-blocking IO for writing as well as reading is almost - but not quite - straightforward.

### Background

One of the limitations pointed out in the Limitations section of the original post in this series was that we were still directly writing our output data to the socket rather than using non-blocking IO and continuations as we were doing when reading our input data. If a client stops reading its input (or if there is sufficient network congestion that it looks that way from our end) then our socket output buffer may fill up. If that happens, then one of two things will happen when we try to write our data to that socket: either the call will block, or the data will not all be written. If the call blocks, then we have a blocked thread that we can not use for processing other clients until it is unblocked. If there are many clients who are not reading their input, we could have many blocked threads. Since one of the goals of this exercise is to be able to run many clients on a relatively small number of threads, having blocked threads is bad. To avoid this problem, we use non-blocking output and continuations for writing to the output, just as we did for reading the input.

The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-write.

### Implementation

We model the output code on the input code by making these changes:
• We write a suspending `write` method that registers our interest in writing to the output socket connection.
• We add an output queue to receive data from the application.
• We modify the `writeLine` method to add a line to the output queue rather than writing directly to the output socket.
• We run a separate control loop that reads from the output queue and writes to the output socket.

```//In class NioConnection
private val outQ = new CoQueue[String](sched, 10)

def start():Unit = {
startWriter
startApp
}

private def startWriter() {
reset {
while (socket.isOpen)
writeWait
}
}

private def write(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1  //indicate EOF
else shift { k =>
selector.register(socket, SelectionKey.OP_WRITE, {
val n = socket.write(b)
k(n)
})
}
}

private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
write(b)
if (b.remaining>0 && socket.isOpen)
writeBuffer(b)
else
shiftUnit[Unit,Unit,Unit]()
}

private def writeWait:Unit @suspendable = {
val str = outQ.blockingDequeue
writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
}

def writeLine(s:String):Unit @suspendable = write(s+"\n")
def write(s:String):Unit @suspendable = outQ.blockingEnqueue(s)
```
This seems pretty straightforward, but unfortunately it doesn't work. The problem is that we have attempted to register our channel twice (once for read and once for write) with the same selector. The documentation for `SelectableChannel` says, "A channel may be registered at most once with any particular selector." If we call `register` for our channel for write when it is already registered for read, the read registration is overwritten by the write registration and is lost.

In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." We could take this approach, adding some code to combine the read and write interest flags when we are in that position, but unlike in James' case we would also need to add some code to demultiplex the separate callbacks for read and write. Instead, we use a different approach: we use separate selectors for reading and writing, and we give each of them its own thread.

### Two Selectors

Depending on the implementation, using two selectors and two threads this way could cause problems. However, based on my understanding of the documentation, the code in the Sun implementation and the operation of the POSIX select operation, I believe this approach should work (at least on POSIX systems). This would need to be tested on all supported platforms for a production system.

To use separate read and write selectors, we replace the current `selector` parameter in `NioConnection` with two parameters `readSelector` and `writeSelector` of the same type.
```//In object NioConnection:
writeSelector:NioSelector, socket:SocketChannel) {
writeSelector,socket)
conn.start()
}

writeSelector:NioSelector, socket:SocketChannel) {
...
private def read(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1  //indicate EOF
else shift { k =>
k(n)
})
}
}

private def write(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1  //indicate EOF
else shift { k =>
writeSelector.register(socket, SelectionKey.OP_WRITE, {
val n = socket.write(b)
k(n)
})
}
}

...
}
```
We also change `NioListener` to pass through those two arguments, and we choose to use the `readSelector` to handle our `accept` calls.
```//In NioListener
...
def start(continueListening: =>Boolean):Unit = {
reset {
while (continueListening) {
val socket = accept()
NioConnection.newConnection(sched,
}
}
}

private def accept():SocketChannel @suspendable = {
shift { k =>
val conn = serverChannel.accept()
conn.configureBlocking(false)
k(conn)
})
}
}
}
```
Finally, we instantiate the new write selector in `NioServer`, pass it in to `NioListener`, and start it running in a new thread.
```//In NioServer
val writeSelector = new NioSelector()
val sched = new DefaultCoScheduler
val listener = new NioListener(sched,

def start() {
listener.start(true)
//run the NIO read and write selectors each on its own thread
sched.run    //run the coroutine scheduler on our thread, renamed
}
}

```

### Close

Our current example has no terminating condition, so never attempts to close the connection. Looking ahead, we expect to have applications that will want to do that, so we add a `close` method to `NioConnection`, and an `isOpen` method that allows us to see when it is closed.

We can't just add a close method that directly closes the socket, because there may still be output data waiting to be written. Thus we need an implementation that somehow waits until all of the queued output data has been written to the output before closing the socket.

One easy way to do this is to have a special marker string that we put into the output queue when the application requests to close the socket. When our socket output code sees that marker, we know it has already written out all of the data that came before that marker in the output queue, so we can close the socket. By doing the socket close in the same method that does the writes to the socket, and by ensuring that that method is called on the (write) selection thread, we also ensure that the close happens on the selection thread.

The compiler shares constant strings, so to make sure we have a unique string for our marker that can't be passed in by any code outside of our `close` method, we use `new String()`. In `writeWait`, where we check for that marker, we use the identity comparison `eq` when checking for the marker, and we add a call to `shiftUnit` to make both sides of the `if` statement be CPS.

A call to our `close` method will return right away, but the socket will not get closed until after all of the data in the output queue has been written to the output socket. The application can tell when the socket has actually been closed by calling the `isOpen` method.
```//In NioConnection
private def writeWait():Unit @suspendable = {
val str = outQ.blockingDequeue
if (str eq closeMarker) {
socket.close
shiftUnit[Unit,Unit,Unit]()
} else
writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
}

def isOpen = socket.isOpen
private val closeMarker = new String("")
def close():Unit @suspendable = write(closeMarker)
```

### Summary

As in the previous two posts, we have modified the program to make an internal improvement that has not changed its basic external behavior. We have, however, changed its behavior for one of the corner cases - in this case what happens when an output socket fills up, such as might happen when there is excessive network latency - which is a necessary improvement for a production application, particularly if one expects the kind of high volume that would make those corner cases more likely.

## Saturday, April 2, 2011

### Java NIO and Scala Coroutines

I present a multi-client server in Scala that uses coroutines to allow modularization of stateful client processing in a way that is independent of threads.

### Background

In my previous two posts I presented a server in Scala that uses Java NIO non-blocking IO and continuations to allow scaling to a large number of clients. As I pointed out in the Limitations section of that first post, that example used one thread for all execution. On a multi-core machine, as is common today, we would prefer to have multiple threads running to allow us to take advantage of all of the processing power available to us, yet we don't want to allocate a thread to every client.

It would be nice if we could add our own `SelectableChannel` types to the set of NIO channel types that we can use with the `select` call so that we could have one place where we do all our scheduling, but that feature is not available. We thus have to come up with another mechanism for handling all of the other potentially blocking tasks we will want to do. Fortunately, we already have such a mechanism: coroutines.

### Coroutines

Coroutines provide a separation of the maintenance of task state from the execution of code for that task, allowing us to bind execution of the task to different threads as we desire. When one of our task coroutines becomes blocked waiting for an unavailable resource, we suspend it by storing its continuation, allowing us to use that thread for another purpose, such as to restore and run a different previously stored continuation that is now runnable.

In my earlier post on coroutines I presented an implementation of a coroutine package that included a scheduler (`CoScheduler`) and a blocking queue (`CoQueue`). We will modify the server implementation of my previous two "Java NIO" posts to make use of those classes.

As pointed out in that earlier coroutines post, the default scheduler implementation in the example can easily be replaced by another implementation with no other changes to the code. In particular, that new implementation could use a thread pool or a group of actors to execute the coroutines that are ready to run, assuming the coroutine code itself is multi-thread safe. We will not write that multi-thread scheduler for this post, but will assume that it can be written later.

### Architecture

At a high level, we want to modify our server so that we have a queue between our socket reader and the application that will eventually consume the data. We can then set up a small processing loop that reads the socket data, converts it to a string and writes it to that queue. The application will read the contents of the queue, process it, and write back its results to the connection. We will let the socket reader continue to run on the select thread, but we will run the application on a separate thread (or threads), ensuring that the select loop can quickly get to all connections and preventing the application processing of any one connection from delaying the IO of other connections.

With this architecture we have two processing loops:
1. Read data from socket, write to queue.
2. Read data from queue, process it, write data (to socket, for now).
Given that for now we are writing directly to the connection socket on output (and ignoring the possibility that the output socket might be blocked), the second loop only has one potential blocking point: if there is no data in the queue, it will block when trying to read from the queue. The first loop has two potential blocking points: when it reads data from the socket (if there is no data available), and when it writes data to the queue (if the queue is full). The difficulty here is that the potentially blocking socket read must be handled by the NIO select call, but the potentially blocking write to the queue can't be handled by the NIO select call and thus must be handled by our own scheduler.

Having one processing loop that when blocked is sometimes managed by one scheduler (NIO select) and sometimes by another (our coroutine scheduler) is not necessarily a problem. Each scheduler just sees a blocking resource that has a continuation associated with it; when the blocking resource becomes available, the continuation is called and the process continues. The new issue that arises when trying to combine two schedulers like this is that an action by one scheduler can potentially unblock a task that is currently controlled by (i.e. in a wait state on) the other scheduler. Every time we perform an action that might unblock a task we need to ensure that the appropriate scheduler is not stuck waiting on the other tasks. In other words, we need to wake up or notify the schedulers at appropriate points in our code.

In this post, code which has changed is highlighed in bold (when not using Syntax Highlighting). Changes for `CoScheduler` and `CoQueue` are as compared to the code in my post on coroutines; changes to `NioSelector`, `NioConnection`, `LineDecoder`, `NioListener` and `NioServer` are as compared to the code in my previous two posts.

The complete source for this post is available on github in my nioserver project, with the specific version used in this post tagged as blog-coroutines. There are also tags for the previous two posts, so you can compare using those tags to see the changes between the versions as used in each post.

### NioSelector

As mentioned above, we have to cooperate with the coroutine scheduler. In particular, we have to be able to deal with the situation that we have no active connections, so we are in a select call, then another thread registers interest in an operation. The documentation for the `select` call states:
Changes made to the interest sets of a selector's keys while a selection operation is in progress have no effect upon that operation; they will be seen by the next selection operation.
To terminate the select operation early so that it retries with the newly registered channel, we add a call to `wakeup` just after registering our interest.

Unfortunately, this is not enough. The documentation for the `select` call is not very precise about whether it is actually possible to call the `register` call from another thread while the `select` call is blocked waiting for a previously registered channel to become active. The documentation for `SelectableChannel` does explicitly say "Selectable channels are safe for use by multiple concurrent threads", but the documentation for the `register` method says "This method will then synchronize on the selector's key set and therefore may block if invoked concurrently with another registration or selection operation involving the same selector." In fact, the standard Sun implementation does quite a bit of synchronization, so quite easily gets deadlocked when used by multiple threads. In particular, the OS-level select call in the Java `select` method is inside a pair of `synchronized` blocks that lock the set of `SelectionKey`s associated with that selector. If, while the first thread is blocked on the select, a second thread calls `SelectableChannel.register`, it locks the channel, then attempts to lock on the key set to which that channel is being added, so it blocks. If a third thread then tries to register that channel with a second selector, which the documentation implies is allowed, the third thread will attempt to lock the channel, which will block until the second thread unblocks and releases its lock on the channel.

In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." From the description of how `register` works above, you can see why.

To get around this problem and ensure that all changes to the selection keys happen on the thread that is calling select, we modify `NioSelect.register` so that, rather than calling `SelectableChannel.register` directly, it packages the arguments up and puts them into a queue which is processed by the selection thread in order to make all of the calls to `SelectableChannel.register` just before it calls `select`.

Fortunately, the semantics of the `wakeup` call ensure that we won't get ourselves into a position where we have put our registration request into the queue but the `select` call doesn't see it and blocks on all the other channels. This is because `wakeup` is defined such that a call to it that happens while the selector is not currently in a select operation will cause the next `select` to wake up immediately.

With this change, all of the key set operations happen on the selection thread and, since the socket read operation is in a callback that gets executed by the selection thread in `NioSelector.executeCallbacks`, all socket reads (and likewise accepts) will happen on the selection thread.

```//In class NioSelector
import scala.collection.mutable.SynchronizedQueue

private case class RegistrationRequest(
channel:SelectableChannel,op:Int,callback:Function0[Unit])
private val regQ = new SynchronizedQueue[RegistrationRequest]

def register(channel:SelectableChannel, op:Int, body: => Unit) {
val callback:Function0[Unit] = { () => { body }}
regQ.enqueue(RegistrationRequest(channel,op,callback))
selector.wakeup()
}

def selectOnce(timeout:Long) {
while (regQ.size>0) {
val req = regQ.dequeue()
req.channel.register(selector,req.op,req.callback)
}
...
}
}
```

### CoScheduler

For our coroutine scheduler, we have to be able to deal with the situation that we have no coroutines that are currently runnable, then at some point one of those coroutines becomes runnable by the actions of another thread. In the architecture described above, this can happen when new data that has been read from a connection is placed into the input queue. To allow us to wait for this kind of event and to be awakened when it happens, we use Java's `wait`/`notify` model. We can't override those methods, since `notify` is final, so we define our own versions, which we call `coWait` and `coNotify`. Given those methods, we also extend `Runnable` and replace the old `run` method with one that runs coroutines until none are available to run, then waits until we are notified and continues the loop.
```trait CoScheduler extends Runnable { cosched =>
private val defaultLock = new java.lang.Object
def coWait():Unit = { defaultLock.synchronized { defaultLock.wait() } }
def coNotify():Unit = { defaultLock.synchronized { defaultLock.notify } }

def run {
while (true) {
runUntilBlockedOrDone
coWait
}
}
}
```
A `coNotify` method that accepts as an argument the coroutine or blocker that has potentially changed state would allow for a more efficient implementation, but for now we choose the simple implementation given above that does not attempt that optimization.

### CoQueue

We use an instance of `CoQueue` as the queue between the socket read loop and the application processing loop. The socket read loop calls `blockingEnqueue` to place an item into the queue, and the application processing loop calls `blockingDequeue` to take an element out of the queue. The result of either of these actions could be to unblock another coroutine, so we modify those methods to add a call to `coNotify` in case they are being called from a coroutine that is not currently being managed by our coroutine scheduler. Since we are calling the enqueue and dequeue methods from different threads, we use a `SynchronizedQueue` rather than a plain `Queue`. Those two methods now look like this:
```import scala.collection.mutable.SynchronizedQueue

class CoQueue ... extends SynchronizedQueue[A] { ...

def blockingEnqueue(x:A):Unit @suspendable = {
enqueueResource.waitUntilNotBlocked
enqueue(x)
dequeueResource.coNotify
}

def blockingDequeue():A @suspendable = {
dequeueResource.waitUntilNotBlocked
val x = dequeue
enqueueResource.coNotify
x
}
```

### NioConnection

We add a `CoQueue` which we use as our input queue between the socket reader loop and the application loop. For this example, we pick an arbitrary limit of 10; if our application gets behind by more than 10 items, the socket reader code will suspend when attempting to write to the queue. If more data arrives while that code is thus suspended, it will back up in the system's input buffer for that connection, and eventually the client will get an error when trying to write to its output connection.

In order to initialize the `CoQueue` we need to pass in a `CoScheduler`, so we add that parameter to our constructor and to the convenience method in our companion object.
```import net.jimmc.scoroutine.{CoQueue,CoScheduler}

//In object NioConnection
def newConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) {
val conn = new NioConnection(sched,selector,socket)
}

class NioConnection(sched:CoScheduler, selector:NioSelector, socket:SocketChannel) {
private val inQ = new CoQueue[String](sched, 10)
}
```
Now that we have a queue, we modify our socket reader code to place our input data (after conversion to a Java string) into our queue rather than writing it straight to the output socket. We want to block when the queue is full, so we call the `blockingEnqueue` method. Since we now know that's the only action we will be taking, we fold the `readAction` method back into `readWhile`. Because `blockingEnqueue` is suspendable, the `else` branch of the `if (count<1)` code block is suspendable, so we need to make the `if` branch suspendable as well. We do this by adding a `shiftUnit` call as the final value in the `if` branch. The `readWhile` method now looks like this:
```    private def readWait = {
buffer.clear()
if (count<1) {
socket.close()
shiftUnit[Unit,Unit,Unit]()
} else {
buffer.flip()
lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
}
}
```
We now have input data going into our queue, but nobody is reading it. For this example, we implement a simple echo loop that reads from the input queue using a new `readLine` method and writes to the output using our existing `writeLine` method. We do this inside a `reset` block so that it becomes another coroutine that can be managed by our coroutine scheduler. Our previous `start` method started up the socket reader loop. We rename that one to `startReader`, add a `startApp` method that starts up our echo loop, and call both of those from a new `start` method. Our `start` method now looks like this:
```//In class NioConnection
def start():Unit = {
startApp
}

private def startApp() {
reset {
while (socket.isOpen)
}
}

reset {
while (socket.isOpen)
}
}

```

### LineDecoder

Our `processBytes` method is now getting passed a callback that is suspendable, so we need to modify the signature of our method to accept that. It passes that callback to `processChars`, so that signature needs to be changed in the same way. Since `processChars` is now calling a suspendable method, it too is suspendable, so its return signature needs to be modified to note that, and since `processBytes` calls `processChars`, it too needs to be modified to have a suspendable return signature.
```//In class LineDecoder
import scala.util.continuations._

def processBytes(b:ByteBuffer,
lineHandler:(String)=>Unit @suspendable):Unit @suspendable = ...

private def processChars(cb:CharBuffer,
lineHandler:(String)=>Unit @suspendable):Unit @suspendable = { ... }
```

### NioListener

`NioListener` calls `NioConnection.newConnection`, and that call now requires a `CoScheduler` argument, so we add that to our constructor and pass it through when we call `newConnection`.
```import net.jimmc.scoroutine.CoScheduler

def start(continueListening: =>Boolean):Unit = {
reset {
while (continueListening) {
val socket = accept()
NioConnection.newConnection(sched,selector,socket)
}
}
}
}
```

### NioServer

`NioServer` instantiates the `NioListener`, so we need to pass it an instance of `CoScheduler`. We create an instance of `DefaultCoScheduler` and pass that in. We now need two threads, one for our coroutine scheduler and one for the NIO scheduler. In our `start` method, we create and start a second `Thread` for the NIO scheduler, then rename our own thread and run the coroutine scheduler on it.
```import net.jimmc.scoroutine.DefaultCoScheduler

val selector = new NioSelector()
val sched = new DefaultCoScheduler
val listener = new NioListener(sched, selector, hostAddr, port)

def start() {
listener.start(true)
//run the NIO selector on its own thread
sched.run    //run the coroutine scheduler on our thread, renamed
}
}
```

### Summary

As in the previous post, we have once again transformed our example application in a way which provides an internal improvement - in this case the ability to use multiple threads - but which has not changed its basic external behavior: we still have a simple echo server. We also have not yet addressed all of the Limitations from the first post in this series. Stay tuned for more.

### Caveats

• Although I have asserted that it is possible to write a multi-threading scheduler to the `CoScheduler` API, I have not yet actually done this. It is possible that this may be more difficult than I expect.
• Multi-threaded code is generally tricky stuff. I have not spent a lot of time running this example code, so it is certainly possible that there are race conditions or other concurrency problems.