Contents
- Background
- NioApplication
- NioServer
- NioListener
- NioConnection
- EchoServer
- ThreeQuestionsServer
- Limitations
Background
In the initial post of this series on Java NIO in Scala I mentioned a set of Limitations of the first example server. In the next three posts after that initial post I addressed some of those limitations. In this post I address the remaining limitation in that original list: the application code (an echo loop in the example) is buried in theNioConnection
class,
which makes that application code more difficult to maintain and
makes the server code not directly reusable as a library.
With the changes described in the next section, all of the application-specific behavior will be encapsulated in an instance of an application-specific subclass of a new class,
NioApplication
.
Since the remainder of the classes presented so far will now be
independent of the application and reusable
without any modifications for multiple applications,
they will be moved into a separate package, net.jimmc.nio
.
Other than adding
package net.jimmc.nio
,
there were no changes to LineDecoder
and NioSelector
,
and there were no changes to the coroutine package
net.jimmc.scoroutine
for this latest set of changes.
For the files that were changed, listed below,
the listings show the complete new version of the file,
with changes from the previous version highlighted in bold.
The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-complete.
NioApplication
Extracting the application-specific code out ofNioConnection
is pretty simple:
in NioConnection.startApp
,
rather than starting up a built-in echo loop,
we add a hook that allows us to call back to an application-specific
method that implements whatever behavior the application wants for
dealing with a connection.
To do this, we define a new abstract class NioApplication
that includes a runConnection
method that we can call
from NioConnection.startApp
.
We will also use the
NioApplication
class as a convenience
class where we can bundle up some of the arguments that get passed
around a lot, in particular the coroutine scheduler and the
read and write selectors.
This gives us the opportunity to override the coroutine scheduler
with one more appropriate for the application,
although we will not do so in this example.
package net.jimmc.nio import net.jimmc.scoroutine.DefaultCoScheduler import scala.util.continuations._ abstract class NioApplication { val readSelector = new NioSelector() val writeSelector = new NioSelector() val sched = new DefaultCoScheduler def runConnection(conn:NioConnection):Unit @suspendable }
NioServer
We simplify theNioServer
class by removing
object NioServer
, which will instead be in the application
main object.
We replace three parameters in the constructor with the
single app
parameter
and likewise replace three arguments in the call to
NioListener
with the single app
argument.
package net.jimmc.nio import net.jimmc.scoroutine.DefaultCoScheduler import java.net.InetAddress class NioServer(app:NioApplication, hostAddr:InetAddress, port:Int) { val listener = new NioListener(app, hostAddr, port) def start() { listener.start(true) //run the NIO read and write selectors each on its own thread (new Thread(app.writeSelector,"WriteSelector")).start (new Thread(app.readSelector,"ReadSelector")).start Thread.currentThread.setName("CoScheduler") app.sched.run //run the coroutine scheduler on our thread, renamed } }
NioListener
Three parameters in the constructor have been replaced by the singleapp
parameter.
package net.jimmc.nio import net.jimmc.scoroutine.CoScheduler import java.net.{InetAddress,InetSocketAddress} import java.nio.channels.{ServerSocketChannel,SocketChannel} import java.nio.channels.SelectionKey import scala.util.continuations._ class NioListener(app:NioApplication, hostAddr:InetAddress, port:Int) { val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false); val isa = new InetSocketAddress(hostAddr,port) serverChannel.socket.bind(isa) def start(continueListening: =>Boolean):Unit = { reset { while (continueListening) { val socket = accept() NioConnection.newConnection(app, socket) } } } private def accept():SocketChannel @suspendable = { shift { k => app.readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, { val conn = serverChannel.accept() conn.configureBlocking(false) k(conn) }) } } }
NioConnection
We modify the constructor and the companion to replace three parameters with the singleapp
parameter, and we replace our echo loop
in startApp
with a call to the application
runConnection
method,
followed by a call to our close
method to make sure we
close the socket when the application is done with it.
package net.jimmc.nio import net.jimmc.scoroutine.{CoQueue,CoScheduler} import java.nio.ByteBuffer import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel import scala.util.continuations._ object NioConnection { def newConnection(app:NioApplication, socket:SocketChannel) { val conn = new NioConnection(app, socket) conn.start() } } class NioConnection(app:NioApplication, socket:SocketChannel) { private val buffer = ByteBuffer.allocateDirect(2000) private val lineDecoder = new LineDecoder private val inQ = new CoQueue[String](app.sched, 10) private val outQ = new CoQueue[String](app.sched, 10) def start():Unit = { startReader startWriter startApp } private def startApp() { reset { app.runConnection(this) close() } } private def startReader() { reset { while (socket.isOpen) readWait } } private def readWait:Unit @suspendable = { buffer.clear() val count = read(buffer) if (count<1) { socket.close() shiftUnit[Unit,Unit,Unit]() } else { buffer.flip() lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_)) } } private def read(b:ByteBuffer):Int @suspendable = { if (!socket.isOpen) -1 //indicate EOF else shift { k => app.readSelector.register(socket, SelectionKey.OP_READ, { val n = socket.read(b) k(n) }) } } def readLine():String @suspendable = inQ.blockingDequeue private def startWriter() { reset { while (socket.isOpen) writeWait } } private def write(b:ByteBuffer):Int @suspendable = { if (!socket.isOpen) -1 //indicate EOF else shift { k => app.writeSelector.register(socket, SelectionKey.OP_WRITE, { val n = socket.write(b) k(n) }) } } private def writeBuffer(b:ByteBuffer):Unit @suspendable = { write(b) if (b.remaining>0 && socket.isOpen) writeBuffer(b) else shiftUnit[Unit,Unit,Unit]() } private def writeWait():Unit @suspendable = { val str = outQ.blockingDequeue if (str eq closeMarker) { socket.close shiftUnit[Unit,Unit,Unit]() } else writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8"))) } def writeLine(s:String) = write(s+"\n") def write(s:String) = outQ.blockingEnqueue(s) def isOpen = socket.isOpen private val closeMarker = new String("") def close():Unit @suspendable = write(closeMarker) }
EchoServer
We move the application-specific main object out ofNioServer
and place it into our sample application class, which we call
EchoServer
, along with a subclassed NioApplication
that provides our application behavior.
Highlighted differences are as compared to the previous version of
NioServer
.
import net.jimmc.nio.{NioApplication,NioConnection,NioServer} import net.jimmc.scoroutine.DefaultCoScheduler import java.net.InetAddress import scala.util.continuations._ object EchoServer { def main(args:Array[String]) { val app = new EchoApplication val hostAddr:InetAddress = null //listen on local connection val port = 1234 val server = new NioServer(app,hostAddr,port) server.start() } } class EchoApplication extends NioApplication { def runConnection(conn:NioConnection):Unit @suspendable = { while (conn.isOpen) { conn.writeLine(conn.readLine) } } }The above class is the complete application definition for our echo server when built on top of our generic nio package. After compiling, run with this command:
$ scala EchoServerWith all the above changes, we have once again internally transformed our application, but besides starting it up with a different name it's external behavior is still the same. However, we have reached the point where defining a new server-based application is easy.
ThreeQuestionsServer
The example in this section shows a slightly more complex application that maintains some local per-client state as it progresses through a short series of steps interacting with the client. In this simple application, the server asks up to three questions of the client and collects responses, with each next question sometimes depending on the previous answers. The per-client state is contained both in local variables and in the location of execution within the application. Each time the processing for a client is suspended the state for that client is captured in a continuation to be restored when the next piece of input is available. The continuation includes all of the above per-client state information, so we don't have to write any application-specific code to save and restore that data.By defining the
ReaderWriter
interface trait,
the application is written so as to be able to run either in server mode
using an instance of ConnReader
,
in which case it accepts connections from clients,
or in standalone mode using an instance of SysReader
,
in which case it only interacts with the console.
When our application running in server mode finishes handling a client and exits from the
run
method,
control returns to NioConnection
,
which closes the connection.
import net.jimmc.nio.{NioApplication,NioServer,NioConnection} import java.io.{BufferedReader,InputStreamReader,PrintWriter} import java.net.InetAddress import scala.util.continuations._ object ThreeQuestionsConsole { def main(args:Array[String]) { val in = new BufferedReader(new InputStreamReader(System.in)) val out = new PrintWriter(System.out) val io = new SysReader(in,out) reset { (new ThreeQuestions(io)).run } } } object ThreeQuestionsServer { def main(args:Array[String]) { val app = new ThreeQuestionsApp val hostAddr:InetAddress = null //localhost val port = 1234 val server = new NioServer(app,hostAddr,port) server.start() } } class ThreeQuestionsApp extends NioApplication { def runConnection(conn:NioConnection):Unit @suspendable = { val io = new ConnReader(conn) (new ThreeQuestions(io)).run } } trait ReaderWriter { def readLine():String @suspendable def writeLine(s:String):Unit @suspendable } class SysReader(in:BufferedReader,out:PrintWriter) extends ReaderWriter { def readLine() = in.readLine def writeLine(s:String) = { out.println(s); out.flush() } } class ConnReader(conn:NioConnection) extends ReaderWriter { def readLine():String @suspendable = conn.readLine def writeLine(s:String):Unit @suspendable = conn.writeLine(s) } class ThreeQuestions(io:ReaderWriter) { def run():Unit @suspendable = { val RxArthur = ".*arthur.*".r val RxGalahad = ".*galahad.*".r val RxLauncelot = ".*(launcelot|lancelot).*".r val RxRobin = ".*robin.*".r val RxHolyGrail = ".*seek the holy grail.*".r val RxSwallow = ".*african or european.*".r val RxAssyriaCapital = ".*(assur|shubat.enlil|kalhu|calah|nineveh|dur.sharrukin).*".r val name = ask("What is your name?").toLowerCase val quest = ask("What is your quest?").toLowerCase val holy = quest match { case RxHolyGrail() => true case _ => false } if (holy) { val q3Type = name match { case RxRobin() => 'capital case RxArthur() => 'swallow case _ => 'color } val a3 = (q3Type match { case 'capital => ask("What is the capital of Assyria?") case 'swallow => ask("What is the air-speed velocity of an unladen swallow?") case 'color => ask("What is your favorite color?") }).toLowerCase (q3Type,a3,name) match { //Need to use an underscore in regex patterns with alternates case ('capital,RxAssyriaCapital(_),_) => accept case ('capital,_,_) => reject case ('swallow,RxSwallow(),_) => rejectMe case ('swallow,_,_) => reject case ('color,"blue",RxLauncelot(_)) => accept case ('color,_,RxLauncelot(_)) => reject case ('color,"yellow",RxGalahad()) => accept case ('color,_,RxGalahad()) => reject case ('color,_,_) => accept } } else { reject } } def ask(s:String):String @suspendable = { io.writeLine(s); io.readLine } def accept:Unit @suspendable = io.writeLine("You may pass") def reject:Unit @suspendable = io.writeLine("you: Auuuuuuuugh!") def rejectMe:Unit @suspendable = io.writeLine("me: Auuuuuuuugh!") }To run in console or server mode, use one of the following two commands:
$ scala ThreeQuestionsConsole $ scala ThreeQuestionsServer
Limitations
I am calling this version complete because it addresses all of the issues in the Limitations section of my original post, but it is far from production-ready. Before putting this code into production I would address the following issues.- Although the application now uses more than one thread, it still runs all of the application code on a single thread. The scheduler should be replaced by one that can choose how many threads to use and distribute the execution of the coroutines among those threads.
- This version still has not addressed all of the issues raised in the
Limitations section of the second post in this series,
on character decoding. In particular:
- Error handling should be improved.
- It only supports UTF-8 encoding.
- The application should parse its command line arguments so that it has the flexibility to, for example, use a different port number without requiring a code change.
- The application should read a configuration file.
- Error handling in general needs to be improved.
- Logging should be added.