In my
previous post
I described a simple Scala server using NIO and continuations,
and mentioned in the
Limitations
section that the example did not convert the data bytes to characters.
In this post I show how that can easily be added by using another
feature of the
Java NIO package:
character-set encoders and decoders.
Java NIO Character Coders
The java.nio.charset package includes a
Charset
class that represents a mapping between the 16-bit Unicode
code-units that Java uses for its internal representation for
characters and strings,
and a sequence of bytes as are stored in a file or transmitted
through a socket connection.
Each such mapping is represented by a separate instance of the
Charset class.
Standard character mappings such as "UTF-8" and "ISO-8859-1"
can be retrieved using the static
forName
method.
Given an instance of Charset,
a
CharsetEncoder
for that character mapping can
be retrieved by calling the
newEncoder
method on that instance.
That encoder can then be used to convert a Java string into a sequence
of bytes suitable for writing to a file or connection.
Similarly, the
newDecoder
method on Charset retrieves a
CharsetDecoder
that can be used for the complementary task of
converting bytes from a file or connection into a Java string.
The encoding and decoding methods convert data between a
CharBuffer
and a
ByteBuffer.
Since the java.nio socket I/O calls we are using read and write
their data to and from ByteBuffers,
it is convenient for the encoding and decoding to use those objects.
LineDecoder
Using the java.nio.charset classes described above,
we write a LineDecoder
class containing a processBytes method that takes as input a
ByteBuffer
(which is what we have to read into when using a
SocketChannel)
and converts that byte data to Java characters.
For this example, we also break up that character data into separate lines
when we see line break characters,
converting each line of characters to a Java String.
One buffer of data might contain multiple lines of character data,
so rather than returning a set of lines,
our method accepts a callback to which we pass each line
as we decode it.
import java.nio.{ByteBuffer,CharBuffer}
import java.nio.charset.{Charset,CharsetDecoder,CharsetEncoder,CoderResult}
import scala.annotation.tailrec
class LineDecoder {
//Encoders and decoders are not multi-thread safe, so create one
//for each connection in case we are using multiple threads.
val utf8Charset = Charset.forName("UTF-8")
val utf8Encoder = utf8Charset.newEncoder
val utf8Decoder = utf8Charset.newDecoder
def processBytes(b:ByteBuffer, lineHandler:(String)=>Unit):Unit =
processChars(utf8Decoder.decode(b),lineHandler)
@tailrec
private def processChars(cb:CharBuffer, lineHandler:(String)=>Unit) {
val len = lengthOfFirstLine(cb)
if (len>=0) {
val ca = new Array[Char](len)
cb.get(ca,0,len)
eatLineEnding(cb)
val line = new String(ca)
lineHandler(line)
processChars(cb, lineHandler) //handle multiple lines
}
}
//Assuming the first character in the buffer is an eol char,
//consume it and a possible matching CR or LF in case the EOL is 2 chars.
private def eatLineEnding(cb:CharBuffer) {
//Eat the first character and see what it is
cb.get match {
case '\n' => if (cb.remaining>0 && cb.charAt(0)=='\r') cb.get
case '\r' => if (cb.remaining>0 && cb.charAt(0)=='\n') cb.get
case _ => //ignore everything else
}
}
private def lengthOfFirstLine(cb:CharBuffer):Int = {
(0 until cb.remaining) find { i =>
List('\n','\r').indexOf(cb.charAt(i))>=0 } getOrElse -1
}
}
Here is an imperative version of lengthOfFirstLine
that does the same thing as the functional version above.
private def lengthOfFirstLine(cb:CharBuffer):Int = {
var cbLen = cb.remaining
for (i <- 0 until cbLen) {
val ch = cb.charAt(i)
if (ch == '\n' || ch == '\r')
return i
}
return -1
}
NioConnection
One of the classes shown in my previous post was the
NioConnection
class,
whose responsibilities include processing input data from the client.
It does this in the method readAction,
which initially looks like this:
//The old version
private def readAction(b:ByteBuffer) {
b.flip()
socket.write(b)
b.clear()
}
We replace the direct call to socket.write
with a call to LineDecoder.processBytes,
which is responsible for decoding the input data,
and we pass it our new writeLine method
that accepts a line of characters
and writes it back to the client.
Also, we don't actually need the call to b.clear here,
which is effectively at the bottom of our readWhile loop,
since we call that method at the top of the loop.
private val lineDecoder = new LineDecoder
private def readAction(b:ByteBuffer) {
b.flip()
lineDecoder.processBytes(b, writeLine)
}
def writeLine(line:String) {
socket.write(ByteBuffer.wrap((line+"\n").getBytes("UTF-8")))
}
Now when we receive some input data, it gets passed to
LineDecoder.processBytes,
which converts it to characters, breaks it up into separate lines,
and calls our writeLine method for each line.
The writeLine method uses
String.getBytes
to convert the characters in the line back to bytes,
wraps those bytes into a ByteBuffer
and writes them directly to the output channel.
As compared to the example in the previous post, this example should
behave the same externally,
but we are now passing around Java strings rather than NIO buffers,
which, assuming we want to deal with string data rather than binary data,
will make it simpler to write the rest of the real application.
Limitations
As with the example in the previous post,
the current example only shows how to use the NIO calls
on the read side of the connection.
We could use a CharsetEncoder on the write side
rather than using String.getBytes and
ByteBuffer.wrap.
Partial input lines (characters not terminated by an EOL character)
are ignored by this implementation.
The example uses the convenience method version of
decode,
which assumes that the input ByteBuffer contains complete
character sequences.
It is possible that a multi-byte character sequence will be
split such that only the first part of that sequence appears at the
end of the input buffer,
with the remainder of the sequence appearing at the start of the next
buffer of input data.
The above implementation will not properly handle this situation.
The underlying
decode method does handle this situation properly,
but the remaining code in this example is not set up for this situation.
The decode
convenience method throws exceptions rather than returning
a status code as the full decode method does.
Since these exceptions are nowhere caught in the code, such an
exception would cause that task to abort.
A more robust solution would have a mechanism to catch exceptions or
restart an aborted task.
Combining Java's NIO non-blocking input/output
with Scala's delimited continuations
allows making a multi-client stateful server that does not require a
a dedicated thread for every active client.
When writing a high-volume server, you want it to be able to handle
a large number of clients in a timely manner.
You want the server to be able to respond quickly to each request
even when there are a lot of clients.
In particular, when you have a large number of clients for whom you
are storing state but are not at this moment making a request,
you want those clients to have a negligible effect on your response time,
and when you have other clients currently making requests that may take
some time to execute, you don't want those requests significantly
increasing your response time to other requests.
From the above, we thus have these requirements:
The size of the state stored for each active client should be minimized.
The time to notice that a client has made a request should not
depend significantly on the number of active clients not making a request.
A client request should never wait on a request from another client.
A simple Java implementation of a server might store its per-client
state in a thread for each
active client, and block waiting for a request on each thread.
Java's thread scheduler then takes care of noticing when input is
is available from one of those blocking reads
and continuing execution on the associated thread.
However, threads are relatively expensive and consume more memory than
necessary.
An implementation that stores the state for each client manually
could store that state much more compactly and thus handle many more clients,
but in addition to being more complicated to implement,
the application must now include a mechanism to multiplex multiple
clients onto fewer threads than there are active clients
without having any clients blocked by other clients.
Scala's
delimited continuations give us a another option for storing
the state of a client that is simpler to implement than manual state
storage and that uses far less memory than using a thread for each client
(see slide 19 of Phillip Haller's
presentation
on Actors, where he says an app can handle up to 5,000
threads, but up to 1,200,000 actors using closures).
That leaves us to solve the issue of how to multiplex many clients
without having them block each other.
If using the standard
java.io
package, the only option for reading
from multiple streams without blocking is to poll using the
available method.
Besides the wasted CPU time,
as the number of active clients grows the polling loop takes longer
to run, meaning additional latency before the application notices
that any particular
stream is ready for reading and thus increased response time.
The Java NIO package provides an efficient solution for this problem.
In the "Nio*" sections below
I present a few Scala classes that break up the NIO work into
chunks that I hope are easy to understand.
The last piece is a simple main class to test it all.
Java Nonblocking IO
Starting with version 1.4,
Java has a package called NIO (new input/output) that includes
the ability to do non-blocking input and output.
The non-blocking IO model in
Java NIO
is based on
Channels
and
Selectors,
similar to
file descriptors
and the
select
call in POSIX operating systems.
A Channel represents a connection to a file, socket,
or some other data source or sink.
A Selector allows collecting a set of Channels
together and waiting until any of them are ready for input or output.
In order to use a Channel with a Selector,
you must
register
the Channel with the Selector
(and the Channel must be a
SelectableChannel).
When you register a channel you also specify which
operations are of interest, such as READ or WRITE.
These operations are represented by bits that can be combined by
ORing them together to create a value that indicates interest
in multiple operations.
Registering a channel with a selector creates a
SelectionKey.
When you later execute the select method on the selector,
it returns a set of SelectionKeys,
from which you can extract the information passed to the register call
when the SelectionKey was created.
NioSelector
The register call to
java.nio.channels.SelectableChannel
includes an "attachment" parameter that
allows the application to attach an arbitrary object to that registration
and retrieve it later when the associated channel becomes selected.
In our server, we attach a callback function to be executed
when the channel becomes selected.
We package this up in our own register method
in our NioSelector class.
import java.nio.channels.SelectableChannel
import java.nio.channels.spi.SelectorProvider
/** Handle nio channel selection. */
class NioSelector {
val selector = SelectorProvider.provider.openSelector()
def register(channel:SelectableChannel, op:Int, body: => Unit) {
val callback:Function0[Unit] = { () => { body }}
channel.register(selector, op, callback)
}
}
We add a selectOnce method that waits for one or
more of the registered channels to be selected,
or until the specified time has elapsed if no channels
are selected during that time.
The underlying call to
selectedKeys
in java.nio.channels.Selector returns to us a
java.util.Set containing the information about
which channels are selected.
Before we process each channel,
we need to clear the current state so that we are able to re-register
interest in that channel during that processing.
We make a copy of the state for all selected channels
and then clear that state before processing any channels.
import scala.collection.JavaConversions
import java.nio.channels.SelectionKey
//In class NioSelector
def selectOnce(timeout:Long) {
selector.select(timeout)
val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys
val keys = JavaConversions.asScalaSet(jKeys).toList
selector.selectedKeys.clear()
keys foreach { _.interestOps(0) }
val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] }
executeCallbacks(callbacks) //Execute callbacks for all selected keys
}
The executeCallbacks method is responsible for executing
the callbacks for the selected channels.
If we assume that at least some of these callbacks may take a relatively
long time to complete, then
in a real server we might want to throw all of these callbacks into
a thread pool or farm them out to actors
so that we can avoid having one long-running callback
block out the rest of the channels.
For now, though, we will ignore that issue and use the simple
implementation of just executing each callback sequentially in
the same thread.
For convenience, we add a selectLoop method that will
continue to call selectOnce until we are ready to stop
processing, and we make our class implement the Runnable
interface.
//In NioSelector class
def run() {
selectLoop(true)
}
def selectLoop(continueProcessing: => Boolean) {
while (continueProcessing) {
selectOnce(0)
}
}
Our main method will create an instance of NioSelector,
do some other initialization, and eventually call
NioSelector.run.
NioListener
The above NioSelector class gives us our main event loop,
but how do we get things hooked up to start generating events?
The basic socket flow on the server side for reading data follows these steps:
Listen for new connections on a socket.
When a new connection is made, initialize that connection and wait
for input data on that connection.
When input data is received, process that input data.
Eventually, close the connection.
Our NioListener class takes care of creating the socket on
which we listen, accepting new connections, and passing them off to
another class for per-connection initialization.
We start with the code that creates the listener socket.
Since we will be passing this socket to our selector, we must create a
ServerSocketChannel
rather than a regular socket.
We don't want to block waiting for this socket, so we call
configureBlocking(false).
import java.net.{InetAddress,InetSocketAddress}
import java.nio.channels.ServerSocketChannel
class NioListener(selector:NioSelector, hostAddr:InetAddress, port:Int) {
val serverChannel = ServerSocketChannel.open()
serverChannel.configureBlocking(false);
val isa = new InetSocketAddress(hostAddr,port)
serverChannel.socket.bind(isa)
}
Now that we have created our socket, we must register it with our selector.
In that call, we will tell the selector that we are interested in the
ACCEPT operation on our socket channel.
The third argument to our register method is the callback;
this is where continuations come in.
Blocking code is generally simpler and thus easier to write and understand than
asynchronous (non-blocking) code.
By using continuations we can capture the current state of the code,
explicitly save it for later execution, and continue running the thread
at a different point.
The code within our delimited continuation is blocked, but the thread does
not get blocked.
Rather than explicitly creating and passing in a callback
with application state,
we arrange
things such that the callback passed to our register method
includes a call to our continuation, so that when that callback is called,
it executes our continuation and makes our code continue from where we
left off.
We implement our own accept method that looks to the calling
code like the blocking
ServerSocket.accept method,
but that uses continuations as described above to do the blocking.
We then call our accept method from within a reset
block that delimits the continuation and determines how much code is
saved in the continuation (everything within the reset)
and the point at which execution continues (just past the reset)
in the thread when the accept call blocks.
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._
//In NioListener class
def start(continueListening: =>Boolean):Unit = {
reset {
while (continueListening) {
val socket = accept()
NioConnection.newConnection(selector,socket)
}
}
}
private def accept():SocketChannel @suspendable = {
shift { k =>
selector.register(serverChannel,SelectionKey.OP_ACCEPT, {
val conn = serverChannel.accept()
conn.configureBlocking(false)
k(conn)
})
}
}
When we call our start method, it enters the
reset block and calls accept, which
registers our interest in the ACCEPT operation,
saves the continuation as part of our callback
in the register method
and immediately returns control to the end of the
reset block, thus the start method returns quickly.
When a new connection arrives, the select code in NioSelector
calls our callback from its executeCallbacks method.
Our callback accepts the new connection and sets it to non-blocking,
then executes our continuation, which will "return" from
our blocked accept method into the loop in our start
method. The code processes the new connection, then loops and calls
accept again, which reregisters interest in an ACCEPT
operation on our socket and
returns execution to the caller of the continuation, which returns control to
the executeCallbacks method in NioSelector.
NioConnection
Our NioConnection class is responsible for creating our
connection object, registering it with the selector,
reading input data when available and processing that data.
The processing that we do in this example is to echo the data back to
the client.
In order to keep this example simple,
we are not using NIO and continuations on the writing side,
rather we use a simple write
call to write our data directly to the socket.
As with NioListener,
we create our own read call that uses continuations to
block, but otherwise looks similar to the standard Java
InputStream.read(byte[]) call,
except that we read into a
ByteBuffer
rather than a byte array.
As with NioListener,
our start method that calls our read method
(via our intermediate method readWait)
contains a reset block that delimits the continuation.
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.SocketChannel
import scala.util.continuations._
object NioConnection {
def newConnection(selector:NioSelector, socket:SocketChannel) {
val conn = new NioConnection(selector,socket)
conn.start()
}
}
class NioConnection(selector:NioSelector, socket:SocketChannel) {
private val buffer = ByteBuffer.allocateDirect(2000)
def start():Unit = {
reset {
while (socket.isOpen)
readWait
}
}
private def readWait = {
buffer.clear()
val count = read(buffer)
if (count<1)
socket.close()
else
readAction(buffer)
}
private def read(b:ByteBuffer):Int @suspendable = {
if (!socket.isOpen)
-1 //indicate EOF
else shift { k =>
selector.register(socket, SelectionKey.OP_READ, {
val n = socket.read(b)
k(n)
})
}
}
private def readAction(b:ByteBuffer) {
b.flip()
socket.write(b)
b.clear()
}
}
The program flow is similar to the flow in NioListener
When we call our start method, it enters the reset
block and (through the call to readWait) calls read,
which registers our interest in the READ operation,
saves the continuation as part of our callback
in the register method
and immediately returns control to the end of the
reset block, thus the start method returns quickly.
When data is available to be read from the socket,
the select code in NioSelector
calls our callback from its executeCallbacks method.
Our callback reads the data from the connection
then executes our continuation, which will "return" from
our blocked read method into the readWait
method. The code processes the input data in readAction
(which just echos the data back to the client),
then (if the socket is still open)
loops and ends up calling
read again, which reregisters interest in a READ
operation on our socket and
returns execution to the caller of the continuation,
which returns control to
the executeCallbacks method in NioSelector.
NioServer
Here is a short main program to test the above modules.
import java.net.InetAddress
object NioServer {
def main(args:Array[String]) {
val hostAddr:InetAddress = null //listen on local connection
val port = 1234
val server = new NioServer(hostAddr,port)
server.start()
}
}
class NioServer(hostAddr:InetAddress, port:Int) {
val selector = new NioSelector()
val listener = new NioListener(selector, hostAddr, port)
def start() {
listener.start(true)
selector.run()
}
}
Create a directory and place into that directory the above four files
(NioSelector.scala, NioListener.scala,
NioConnection.scala and NioServer.scala),
cd into that directory, then execute the following commands to
compile the files and execute the test program:
If you get an "Address already in use" error and traceback,
edit NioServer.scala and pick another port number.
After starting the server, connect to it using telnet:
telnet localhost 1234
In the telnet client, each time you enter a line of text
the server should echo that line of text back to you.
Limitations
The above example demonstrates a very basic echo server
in a bit over 100 lines of code
that uses NIO and continuations for reading data,
but it has some limitations:
The processing of the input data is buried in the code that reads the data.
A well-factored architecture would allow that processing code to be
separately maintained.
We are using one thread for all execution.
In the above example we are just echoing clients' input back to them,
but if we want to do some heavier processing we will need to move that
work off of the select thread onto one or more worker threads
so as not to block everyone while handling a request for one client.
Communication between the select thread and the worker threads will
require some kind of buffering, which introduces more points at which
execution can be blocked, thus more opportunities to use continuations.
If we wish to treat the data as characters rather than bytes then we
need to do that conversion.
We are not using IO and continuations when writing data to the client,
so if for some reason our socket is not ready for writing
the application will not work properly
(either it will block waiting for output,
or we will lose some output data).
We are not storing any significant amount of state in our continuations.
If all we are doing is a simple echo server, we don't really need
continuations.
A more sophisticated example would better illustrate the convenience
of using continuations to store program state.
MINA,
Apache's network application framework that uses NIO.
Loft,
a single-threaded web server in Scala using NIO and continuations
(under development).
A post
from July 2009
on the Scala mailing list with an (uncompiled) example of a simple
single-threaded server using NIO.
The post demonstrates the approach of creating versions of the standard
calls (accept, read, write) that block using continuations.
Unless otherwise specified in individual blog entries, all source code in this blog is Copyright by Jim McBeath, as of the posting date, under the GNU Lesser General Public License (LGPL), Version 3.