Friday, April 15, 2011

Java Nio Complete Scala Server

The capstone to this series of posts: a complete multi-client stateful application server in Scala using Java NIO non-blocking IO for both reading and writing, and delimited continuations as coroutines for both IO and application processing.

Contents

Background

In the initial post of this series on Java NIO in Scala I mentioned a set of Limitations of the first example server. In the next three posts after that initial post I addressed some of those limitations. In this post I address the remaining limitation in that original list: the application code (an echo loop in the example) is buried in the NioConnection class, which makes that application code more difficult to maintain and makes the server code not directly reusable as a library.

With the changes described in the next section, all of the application-specific behavior will be encapsulated in an instance of an application-specific subclass of a new class, NioApplication. Since the remainder of the classes presented so far will now be independent of the application and reusable without any modifications for multiple applications, they will be moved into a separate package, net.jimmc.nio.

Other than adding package net.jimmc.nio, there were no changes to LineDecoder and NioSelector, and there were no changes to the coroutine package net.jimmc.scoroutine for this latest set of changes. For the files that were changed, listed below, the listings show the complete new version of the file, with changes from the previous version highlighted in bold.

The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-complete.

NioApplication

Extracting the application-specific code out of NioConnection is pretty simple: in NioConnection.startApp, rather than starting up a built-in echo loop, we add a hook that allows us to call back to an application-specific method that implements whatever behavior the application wants for dealing with a connection. To do this, we define a new abstract class NioApplication that includes a runConnection method that we can call from NioConnection.startApp.

We will also use the NioApplication class as a convenience class where we can bundle up some of the arguments that get passed around a lot, in particular the coroutine scheduler and the read and write selectors. This gives us the opportunity to override the coroutine scheduler with one more appropriate for the application, although we will not do so in this example.

package net.jimmc.nio

import net.jimmc.scoroutine.DefaultCoScheduler

import scala.util.continuations._

abstract class NioApplication {
    val readSelector = new NioSelector()
    val writeSelector = new NioSelector()
    val sched = new DefaultCoScheduler

    def runConnection(conn:NioConnection):Unit @suspendable
}

NioServer

We simplify the NioServer class by removing object NioServer, which will instead be in the application main object. We replace three parameters in the constructor with the single app parameter and likewise replace three arguments in the call to NioListener with the single app argument.
package net.jimmc.nio

import net.jimmc.scoroutine.DefaultCoScheduler

import java.net.InetAddress

class NioServer(app:NioApplication, hostAddr:InetAddress, port:Int) {
    val listener = new NioListener(app, hostAddr, port)

    def start() {
        listener.start(true)
        //run the NIO read and write selectors each on its own thread
        (new Thread(app.writeSelector,"WriteSelector")).start
        (new Thread(app.readSelector,"ReadSelector")).start
        Thread.currentThread.setName("CoScheduler")
        app.sched.run    //run the coroutine scheduler on our thread, renamed
    }
}

NioListener

Three parameters in the constructor have been replaced by the single app parameter.
package net.jimmc.nio

import net.jimmc.scoroutine.CoScheduler

import java.net.{InetAddress,InetSocketAddress}
import java.nio.channels.{ServerSocketChannel,SocketChannel}
import java.nio.channels.SelectionKey
import scala.util.continuations._

class NioListener(app:NioApplication, hostAddr:InetAddress, port:Int) {

    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false);
    val isa = new InetSocketAddress(hostAddr,port)
    serverChannel.socket.bind(isa)

    def start(continueListening: =>Boolean):Unit = {
        reset {
            while (continueListening) {
                val socket = accept()
                NioConnection.newConnection(app, socket)
            }
        }
    }

    private def accept():SocketChannel @suspendable = {
        shift { k =>
            app.readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, {
                val conn = serverChannel.accept()
                conn.configureBlocking(false)
                k(conn)
            })
        }
    }
}

NioConnection

We modify the constructor and the companion to replace three parameters with the single app parameter, and we replace our echo loop in startApp with a call to the application runConnection method, followed by a call to our close method to make sure we close the socket when the application is done with it.
package net.jimmc.nio

import net.jimmc.scoroutine.{CoQueue,CoScheduler}

import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._

object NioConnection {
    def newConnection(app:NioApplication, socket:SocketChannel) {
        val conn = new NioConnection(app, socket)
        conn.start()
    }
}

class NioConnection(app:NioApplication, socket:SocketChannel) {

    private val buffer = ByteBuffer.allocateDirect(2000)
    private val lineDecoder = new LineDecoder
    private val inQ = new CoQueue[String](app.sched, 10)
    private val outQ = new CoQueue[String](app.sched, 10)

    def start():Unit = {
        startReader
        startWriter
        startApp
    }

    private def startApp() {
        reset {
            app.runConnection(this)
            close()
        }
    }

    private def startReader() {
        reset {
            while (socket.isOpen)
                readWait
        }
    }

    private def readWait:Unit @suspendable = {
        buffer.clear()
        val count = read(buffer)
        if (count<1) {
            socket.close()
            shiftUnit[Unit,Unit,Unit]()
        } else {
            buffer.flip()
            lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
        }
    }

    private def read(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            app.readSelector.register(socket, SelectionKey.OP_READ, {
                val n = socket.read(b)
                k(n)
            })
        }
    }

    def readLine():String @suspendable = inQ.blockingDequeue

    private def startWriter() {
        reset {
            while (socket.isOpen)
                writeWait
        }
    }

    private def write(b:ByteBuffer):Int @suspendable = {
        if (!socket.isOpen)
            -1  //indicate EOF
        else shift { k =>
            app.writeSelector.register(socket, SelectionKey.OP_WRITE, {
                val n = socket.write(b)
                k(n)
            })
        }
    }

    private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
        write(b)
        if (b.remaining>0 && socket.isOpen)
            writeBuffer(b)
        else
            shiftUnit[Unit,Unit,Unit]()
    }

    private def writeWait():Unit @suspendable = {
        val str = outQ.blockingDequeue
        if (str eq closeMarker) {
            socket.close
            shiftUnit[Unit,Unit,Unit]()
        } else
            writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
    }

    def writeLine(s:String) = write(s+"\n")
    def write(s:String) = outQ.blockingEnqueue(s)

    def isOpen = socket.isOpen
    private val closeMarker = new String("")
    def close():Unit @suspendable = write(closeMarker)
}

EchoServer

We move the application-specific main object out of NioServer and place it into our sample application class, which we call EchoServer, along with a subclassed NioApplication that provides our application behavior.

Highlighted differences are as compared to the previous version of NioServer.
import net.jimmc.nio.{NioApplication,NioConnection,NioServer}
import net.jimmc.scoroutine.DefaultCoScheduler

import java.net.InetAddress
import scala.util.continuations._

object EchoServer {
    def main(args:Array[String]) {
        val app = new EchoApplication
        val hostAddr:InetAddress = null //listen on local connection
        val port = 1234
        val server = new NioServer(app,hostAddr,port)
        server.start()
    }
}

class EchoApplication extends NioApplication {
    def runConnection(conn:NioConnection):Unit @suspendable = {
        while (conn.isOpen) {
            conn.writeLine(conn.readLine)
        }
    }
}
The above class is the complete application definition for our echo server when built on top of our generic nio package. After compiling, run with this command:
$ scala EchoServer
With all the above changes, we have once again internally transformed our application, but besides starting it up with a different name it's external behavior is still the same. However, we have reached the point where defining a new server-based application is easy.

ThreeQuestionsServer

The example in this section shows a slightly more complex application that maintains some local per-client state as it progresses through a short series of steps interacting with the client. In this simple application, the server asks up to three questions of the client and collects responses, with each next question sometimes depending on the previous answers. The per-client state is contained both in local variables and in the location of execution within the application. Each time the processing for a client is suspended the state for that client is captured in a continuation to be restored when the next piece of input is available. The continuation includes all of the above per-client state information, so we don't have to write any application-specific code to save and restore that data.

By defining the ReaderWriter interface trait, the application is written so as to be able to run either in server mode using an instance of ConnReader, in which case it accepts connections from clients, or in standalone mode using an instance of SysReader, in which case it only interacts with the console.

When our application running in server mode finishes handling a client and exits from the run method, control returns to NioConnection, which closes the connection.
import net.jimmc.nio.{NioApplication,NioServer,NioConnection}

import java.io.{BufferedReader,InputStreamReader,PrintWriter}
import java.net.InetAddress

import scala.util.continuations._

object ThreeQuestionsConsole {
    def main(args:Array[String]) {
        val in = new BufferedReader(new InputStreamReader(System.in))
        val out = new PrintWriter(System.out)
        val io = new SysReader(in,out)
        reset {
            (new ThreeQuestions(io)).run
        }
    }
}

object ThreeQuestionsServer {
    def main(args:Array[String]) {
        val app = new ThreeQuestionsApp
        val hostAddr:InetAddress = null //localhost
        val port = 1234
        val server = new NioServer(app,hostAddr,port)
        server.start()
    }
}

class ThreeQuestionsApp extends NioApplication {
    def runConnection(conn:NioConnection):Unit @suspendable = {
        val io = new ConnReader(conn)
        (new ThreeQuestions(io)).run
    }
}

trait ReaderWriter {
    def readLine():String @suspendable
    def writeLine(s:String):Unit @suspendable
}

class SysReader(in:BufferedReader,out:PrintWriter) extends ReaderWriter {
    def readLine() = in.readLine
    def writeLine(s:String) = { out.println(s); out.flush() }
}

class ConnReader(conn:NioConnection) extends ReaderWriter {
    def readLine():String @suspendable = conn.readLine
    def writeLine(s:String):Unit @suspendable = conn.writeLine(s)
}

class ThreeQuestions(io:ReaderWriter) {
    def run():Unit @suspendable = {
        val RxArthur = ".*arthur.*".r
        val RxGalahad = ".*galahad.*".r
        val RxLauncelot = ".*(launcelot|lancelot).*".r
        val RxRobin = ".*robin.*".r
        val RxHolyGrail = ".*seek the holy grail.*".r
        val RxSwallow = ".*african or european.*".r
        val RxAssyriaCapital =
            ".*(assur|shubat.enlil|kalhu|calah|nineveh|dur.sharrukin).*".r
        val name = ask("What is your name?").toLowerCase
        val quest = ask("What is your quest?").toLowerCase
        val holy = quest match {
            case RxHolyGrail() => true
            case _ => false
        }
        if (holy) {
            val q3Type = name match {
                case RxRobin() => 'capital
                case RxArthur() => 'swallow
                case _ => 'color
            }
            val a3 = (q3Type match {
                case 'capital => ask("What is the capital of Assyria?")
                case 'swallow => ask("What is the air-speed velocity of an unladen swallow?")
                case 'color => ask("What is your favorite color?")
            }).toLowerCase
            (q3Type,a3,name) match {
                //Need to use an underscore in regex patterns with alternates
                case ('capital,RxAssyriaCapital(_),_) => accept
                case ('capital,_,_) => reject
                case ('swallow,RxSwallow(),_) => rejectMe
                case ('swallow,_,_) => reject
                case ('color,"blue",RxLauncelot(_)) => accept
                case ('color,_,RxLauncelot(_)) => reject
                case ('color,"yellow",RxGalahad()) => accept
                case ('color,_,RxGalahad()) => reject
                case ('color,_,_) => accept
            }
        } else {
            reject
        }
    }

    def ask(s:String):String @suspendable = { io.writeLine(s); io.readLine }
    def accept:Unit @suspendable = io.writeLine("You may pass")
    def reject:Unit @suspendable = io.writeLine("you: Auuuuuuuugh!")
    def rejectMe:Unit @suspendable = io.writeLine("me: Auuuuuuuugh!")
}
To run in console or server mode, use one of the following two commands:
$ scala ThreeQuestionsConsole
$ scala ThreeQuestionsServer

Limitations

I am calling this version complete because it addresses all of the issues in the Limitations section of my original post, but it is far from production-ready. Before putting this code into production I would address the following issues.
  • Although the application now uses more than one thread, it still runs all of the application code on a single thread. The scheduler should be replaced by one that can choose how many threads to use and distribute the execution of the coroutines among those threads.
  • This version still has not addressed all of the issues raised in the Limitations section of the second post in this series, on character decoding. In particular:
    • Error handling should be improved.
    • It only supports UTF-8 encoding.
    For an example of this problem, type a Control-C into your telnet window when connected to the EchoServer application.
  • The application should parse its command line arguments so that it has the flexibility to, for example, use a different port number without requiring a code change.
  • The application should read a configuration file.
  • Error handling in general needs to be improved.
  • Logging should be added.

1 comment:

Sandeep said...
This comment has been removed by a blog administrator.