Contents
- Background
- NioApplication
- NioServer
- NioListener
- NioConnection
- EchoServer
- ThreeQuestionsServer
- Limitations
Background
In the initial post of this series on Java NIO in Scala I mentioned a set of Limitations of the first example server. In the next three posts after that initial post I addressed some of those limitations. In this post I address the remaining limitation in that original list: the application code (an echo loop in the example) is buried in theNioConnection class,
which makes that application code more difficult to maintain and
makes the server code not directly reusable as a library.
With the changes described in the next section, all of the application-specific behavior will be encapsulated in an instance of an application-specific subclass of a new class,
NioApplication.
Since the remainder of the classes presented so far will now be
independent of the application and reusable
without any modifications for multiple applications,
they will be moved into a separate package, net.jimmc.nio.
Other than adding
package net.jimmc.nio,
there were no changes to LineDecoder
and NioSelector,
and there were no changes to the coroutine package
net.jimmc.scoroutine
for this latest set of changes.
For the files that were changed, listed below,
the listings show the complete new version of the file,
with changes from the previous version highlighted in bold.
The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-complete.
NioApplication
Extracting the application-specific code out ofNioConnection
is pretty simple:
in NioConnection.startApp,
rather than starting up a built-in echo loop,
we add a hook that allows us to call back to an application-specific
method that implements whatever behavior the application wants for
dealing with a connection.
To do this, we define a new abstract class NioApplication
that includes a runConnection method that we can call
from NioConnection.startApp.
We will also use the
NioApplication class as a convenience
class where we can bundle up some of the arguments that get passed
around a lot, in particular the coroutine scheduler and the
read and write selectors.
This gives us the opportunity to override the coroutine scheduler
with one more appropriate for the application,
although we will not do so in this example.
package net.jimmc.nio
import net.jimmc.scoroutine.DefaultCoScheduler
import scala.util.continuations._
abstract class NioApplication {
val readSelector = new NioSelector()
val writeSelector = new NioSelector()
val sched = new DefaultCoScheduler
def runConnection(conn:NioConnection):Unit @suspendable
}
NioServer
We simplify theNioServer class by removing
object NioServer, which will instead be in the application
main object.
We replace three parameters in the constructor with the
single app parameter
and likewise replace three arguments in the call to
NioListener with the single app argument.
package net.jimmc.nio
import net.jimmc.scoroutine.DefaultCoScheduler
import java.net.InetAddress
class NioServer(app:NioApplication, hostAddr:InetAddress, port:Int) {
val listener = new NioListener(app, hostAddr, port)
def start() {
listener.start(true)
//run the NIO read and write selectors each on its own thread
(new Thread(app.writeSelector,"WriteSelector")).start
(new Thread(app.readSelector,"ReadSelector")).start
Thread.currentThread.setName("CoScheduler")
app.sched.run //run the coroutine scheduler on our thread, renamed
}
}
NioListener
Three parameters in the constructor have been replaced by the singleapp parameter.
package net.jimmc.nio
import net.jimmc.scoroutine.CoScheduler
import java.net.{InetAddress,InetSocketAddress}
import java.nio.channels.{ServerSocketChannel,SocketChannel}
import java.nio.channels.SelectionKey
import scala.util.continuations._
class NioListener(app:NioApplication, hostAddr:InetAddress, port:Int) {
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false);
val isa = new InetSocketAddress(hostAddr,port)
serverChannel.socket.bind(isa)
def start(continueListening: =>Boolean):Unit = {
reset {
while (continueListening) {
val socket = accept()
NioConnection.newConnection(app, socket)
}
}
}
private def accept():SocketChannel @suspendable = {
shift { k =>
app.readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, {
val conn = serverChannel.accept()
conn.configureBlocking(false)
k(conn)
})
}
}
}
NioConnection
We modify the constructor and the companion to replace three parameters with the singleapp parameter, and we replace our echo loop
in startApp with a call to the application
runConnection method,
followed by a call to our close method to make sure we
close the socket when the application is done with it.
package net.jimmc.nio
import net.jimmc.scoroutine.{CoQueue,CoScheduler}
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._
object NioConnection {
def newConnection(app:NioApplication, socket:SocketChannel) {
val conn = new NioConnection(app, socket)
conn.start()
}
}
class NioConnection(app:NioApplication, socket:SocketChannel) {
private val buffer = ByteBuffer.allocateDirect(2000)
private val lineDecoder = new LineDecoder
private val inQ = new CoQueue[String](app.sched, 10)
private val outQ = new CoQueue[String](app.sched, 10)
def start():Unit = {
startReader
startWriter
startApp
}
private def startApp() {
reset {
app.runConnection(this)
close()
}
}
private def startReader() {
reset {
while (socket.isOpen)
readWait
}
}
private def readWait:Unit @suspendable = {
buffer.clear()
val count = read(buffer)
if (count<1) {
socket.close()
shiftUnit[Unit,Unit,Unit]()
} else {
buffer.flip()
lineDecoder.processBytes(buffer, inQ.blockingEnqueue(_))
}
}
private def read(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1 //indicate EOF
else shift { k =>
app.readSelector.register(socket, SelectionKey.OP_READ, {
val n = socket.read(b)
k(n)
})
}
}
def readLine():String @suspendable = inQ.blockingDequeue
private def startWriter() {
reset {
while (socket.isOpen)
writeWait
}
}
private def write(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1 //indicate EOF
else shift { k =>
app.writeSelector.register(socket, SelectionKey.OP_WRITE, {
val n = socket.write(b)
k(n)
})
}
}
private def writeBuffer(b:ByteBuffer):Unit @suspendable = {
write(b)
if (b.remaining>0 && socket.isOpen)
writeBuffer(b)
else
shiftUnit[Unit,Unit,Unit]()
}
private def writeWait():Unit @suspendable = {
val str = outQ.blockingDequeue
if (str eq closeMarker) {
socket.close
shiftUnit[Unit,Unit,Unit]()
} else
writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8")))
}
def writeLine(s:String) = write(s+"\n")
def write(s:String) = outQ.blockingEnqueue(s)
def isOpen = socket.isOpen
private val closeMarker = new String("")
def close():Unit @suspendable = write(closeMarker)
}
EchoServer
We move the application-specific main object out ofNioServer
and place it into our sample application class, which we call
EchoServer, along with a subclassed NioApplication
that provides our application behavior.
Highlighted differences are as compared to the previous version of
NioServer.
import net.jimmc.nio.{NioApplication,NioConnection,NioServer}
import net.jimmc.scoroutine.DefaultCoScheduler
import java.net.InetAddress
import scala.util.continuations._
object EchoServer {
def main(args:Array[String]) {
val app = new EchoApplication
val hostAddr:InetAddress = null //listen on local connection
val port = 1234
val server = new NioServer(app,hostAddr,port)
server.start()
}
}
class EchoApplication extends NioApplication {
def runConnection(conn:NioConnection):Unit @suspendable = {
while (conn.isOpen) {
conn.writeLine(conn.readLine)
}
}
}
The above class is the complete application definition for our
echo server when built on top of our generic nio package.
After compiling, run with this command:
$ scala EchoServerWith all the above changes, we have once again internally transformed our application, but besides starting it up with a different name it's external behavior is still the same. However, we have reached the point where defining a new server-based application is easy.
ThreeQuestionsServer
The example in this section shows a slightly more complex application that maintains some local per-client state as it progresses through a short series of steps interacting with the client. In this simple application, the server asks up to three questions of the client and collects responses, with each next question sometimes depending on the previous answers. The per-client state is contained both in local variables and in the location of execution within the application. Each time the processing for a client is suspended the state for that client is captured in a continuation to be restored when the next piece of input is available. The continuation includes all of the above per-client state information, so we don't have to write any application-specific code to save and restore that data.By defining the
ReaderWriter interface trait,
the application is written so as to be able to run either in server mode
using an instance of ConnReader,
in which case it accepts connections from clients,
or in standalone mode using an instance of SysReader,
in which case it only interacts with the console.
When our application running in server mode finishes handling a client and exits from the
run method,
control returns to NioConnection,
which closes the connection.
import net.jimmc.nio.{NioApplication,NioServer,NioConnection}
import java.io.{BufferedReader,InputStreamReader,PrintWriter}
import java.net.InetAddress
import scala.util.continuations._
object ThreeQuestionsConsole {
def main(args:Array[String]) {
val in = new BufferedReader(new InputStreamReader(System.in))
val out = new PrintWriter(System.out)
val io = new SysReader(in,out)
reset {
(new ThreeQuestions(io)).run
}
}
}
object ThreeQuestionsServer {
def main(args:Array[String]) {
val app = new ThreeQuestionsApp
val hostAddr:InetAddress = null //localhost
val port = 1234
val server = new NioServer(app,hostAddr,port)
server.start()
}
}
class ThreeQuestionsApp extends NioApplication {
def runConnection(conn:NioConnection):Unit @suspendable = {
val io = new ConnReader(conn)
(new ThreeQuestions(io)).run
}
}
trait ReaderWriter {
def readLine():String @suspendable
def writeLine(s:String):Unit @suspendable
}
class SysReader(in:BufferedReader,out:PrintWriter) extends ReaderWriter {
def readLine() = in.readLine
def writeLine(s:String) = { out.println(s); out.flush() }
}
class ConnReader(conn:NioConnection) extends ReaderWriter {
def readLine():String @suspendable = conn.readLine
def writeLine(s:String):Unit @suspendable = conn.writeLine(s)
}
class ThreeQuestions(io:ReaderWriter) {
def run():Unit @suspendable = {
val RxArthur = ".*arthur.*".r
val RxGalahad = ".*galahad.*".r
val RxLauncelot = ".*(launcelot|lancelot).*".r
val RxRobin = ".*robin.*".r
val RxHolyGrail = ".*seek the holy grail.*".r
val RxSwallow = ".*african or european.*".r
val RxAssyriaCapital =
".*(assur|shubat.enlil|kalhu|calah|nineveh|dur.sharrukin).*".r
val name = ask("What is your name?").toLowerCase
val quest = ask("What is your quest?").toLowerCase
val holy = quest match {
case RxHolyGrail() => true
case _ => false
}
if (holy) {
val q3Type = name match {
case RxRobin() => 'capital
case RxArthur() => 'swallow
case _ => 'color
}
val a3 = (q3Type match {
case 'capital => ask("What is the capital of Assyria?")
case 'swallow => ask("What is the air-speed velocity of an unladen swallow?")
case 'color => ask("What is your favorite color?")
}).toLowerCase
(q3Type,a3,name) match {
//Need to use an underscore in regex patterns with alternates
case ('capital,RxAssyriaCapital(_),_) => accept
case ('capital,_,_) => reject
case ('swallow,RxSwallow(),_) => rejectMe
case ('swallow,_,_) => reject
case ('color,"blue",RxLauncelot(_)) => accept
case ('color,_,RxLauncelot(_)) => reject
case ('color,"yellow",RxGalahad()) => accept
case ('color,_,RxGalahad()) => reject
case ('color,_,_) => accept
}
} else {
reject
}
}
def ask(s:String):String @suspendable = { io.writeLine(s); io.readLine }
def accept:Unit @suspendable = io.writeLine("You may pass")
def reject:Unit @suspendable = io.writeLine("you: Auuuuuuuugh!")
def rejectMe:Unit @suspendable = io.writeLine("me: Auuuuuuuugh!")
}
To run in console or server mode, use one of the following two commands:
$ scala ThreeQuestionsConsole $ scala ThreeQuestionsServer
Limitations
I am calling this version complete because it addresses all of the issues in the Limitations section of my original post, but it is far from production-ready. Before putting this code into production I would address the following issues.- Although the application now uses more than one thread, it still runs all of the application code on a single thread. The scheduler should be replaced by one that can choose how many threads to use and distribute the execution of the coroutines among those threads.
- This version still has not addressed all of the issues raised in the
Limitations section of the second post in this series,
on character decoding. In particular:
- Error handling should be improved.
- It only supports UTF-8 encoding.
- The application should parse its command line arguments so that it has the flexibility to, for example, use a different port number without requiring a code change.
- The application should read a configuration file.
- Error handling in general needs to be improved.
- Logging should be added.