Contents
- Motivation
- Java Nonblocking IO
- NioSelector
- NioListener
- NioConnection
- NioServer
- Limitations
- Resources
Motivation
When writing a high-volume server, you want it to be able to handle a large number of clients in a timely manner. You want the server to be able to respond quickly to each request even when there are a lot of clients. In particular, when you have a large number of clients for whom you are storing state but are not at this moment making a request, you want those clients to have a negligible effect on your response time, and when you have other clients currently making requests that may take some time to execute, you don't want those requests significantly increasing your response time to other requests.From the above, we thus have these requirements:
- The size of the state stored for each active client should be minimized.
- The time to notice that a client has made a request should not depend significantly on the number of active clients not making a request.
- A client request should never wait on a request from another client.
An implementation that stores the state for each client manually could store that state much more compactly and thus handle many more clients, but in addition to being more complicated to implement, the application must now include a mechanism to multiplex multiple clients onto fewer threads than there are active clients without having any clients blocked by other clients.
Scala's delimited continuations give us a another option for storing the state of a client that is simpler to implement than manual state storage and that uses far less memory than using a thread for each client (see slide 19 of Phillip Haller's presentation on Actors, where he says an app can handle up to 5,000 threads, but up to 1,200,000 actors using closures).
That leaves us to solve the issue of how to multiplex many clients without having them block each other. If using the standard java.io package, the only option for reading from multiple streams without blocking is to poll using the
available
method.
Besides the wasted CPU time,
as the number of active clients grows the polling loop takes longer
to run, meaning additional latency before the application notices
that any particular
stream is ready for reading and thus increased response time.
The Java NIO package provides an efficient solution for this problem.
In the "Nio*" sections below I present a few Scala classes that break up the NIO work into chunks that I hope are easy to understand. The last piece is a simple main class to test it all.
Java Nonblocking IO
Starting with version 1.4, Java has a package called NIO (new input/output) that includes the ability to do non-blocking input and output.The non-blocking IO model in Java NIO is based on
Channel
s
and
Selector
s,
similar to
file descriptors
and the
select
call in POSIX operating systems.
A Channel
represents a connection to a file, socket,
or some other data source or sink.
A Selector
allows collecting a set of Channel
s
together and waiting until any of them are ready for input or output.
In order to use a
Channel
with a Selector
,
you must
register
the Channel
with the Selector
(and the Channel
must be a
SelectableChannel
).
When you register a channel you also specify which
operations are of interest, such as READ or WRITE.
These operations are represented by bits that can be combined by
ORing them together to create a value that indicates interest
in multiple operations.
Registering a channel with a selector creates a
SelectionKey
.
When you later execute the select method on the selector,
it returns a set of SelectionKey
s,
from which you can extract the information passed to the register call
when the SelectionKey
was created.
NioSelector
Theregister
call to
java.nio.channels.SelectableChannel
includes an "attachment" parameter that
allows the application to attach an arbitrary object to that registration
and retrieve it later when the associated channel becomes selected.
In our server, we attach a callback function to be executed
when the channel becomes selected.
We package this up in our own register
method
in our NioSelector
class.
import java.nio.channels.SelectableChannel import java.nio.channels.spi.SelectorProvider /** Handle nio channel selection. */ class NioSelector { val selector = SelectorProvider.provider.openSelector() def register(channel:SelectableChannel, op:Int, body: => Unit) { val callback:Function0[Unit] = { () => { body }} channel.register(selector, op, callback) } }We add a
selectOnce
method that waits for one or
more of the registered channels to be selected,
or until the specified time has elapsed if no channels
are selected during that time.
The underlying call to
selectedKeys
in java.nio.channels.Selector
returns to us a
java.util.Set
containing the information about
which channels are selected.
Before we process each channel,
we need to clear the current state so that we are able to re-register
interest in that channel during that processing.
We make a copy of the state for all selected channels
and then clear that state before processing any channels.
import scala.collection.JavaConversions import java.nio.channels.SelectionKey //In class NioSelector def selectOnce(timeout:Long) { selector.select(timeout) val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys val keys = JavaConversions.asScalaSet(jKeys).toList selector.selectedKeys.clear() keys foreach { _.interestOps(0) } val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] } executeCallbacks(callbacks) //Execute callbacks for all selected keys }The
executeCallbacks
method is responsible for executing
the callbacks for the selected channels.
If we assume that at least some of these callbacks may take a relatively
long time to complete, then
in a real server we might want to throw all of these callbacks into
a thread pool or farm them out to actors
so that we can avoid having one long-running callback
block out the rest of the channels.
For now, though, we will ignore that issue and use the simple
implementation of just executing each callback sequentially in
the same thread.
//In class NioSelector def executeCallbacks(callbacks:List[()=>Unit]) { callbacks foreach { _() } }For convenience, we add a
selectLoop
method that will
continue to call selectOnce
until we are ready to stop
processing, and we make our class implement the Runnable
interface.
//In NioSelector class def run() { selectLoop(true) } def selectLoop(continueProcessing: => Boolean) { while (continueProcessing) { selectOnce(0) } }Here is the complete
NioSelector
class:
import scala.collection.JavaConversions import java.nio.channels.SelectableChannel import java.nio.channels.SelectionKey import java.nio.channels.spi.SelectorProvider /** Handle nio channel selection. */ class NioSelector extends Runnable { val selector = SelectorProvider.provider.openSelector() def register(channel:SelectableChannel, op:Int, body: => Unit) { val callback:Function0[Unit] = { () => { body }} channel.register(selector, op, callback) } def run() { selectLoop(true) } def selectLoop(continueProcessing: => Boolean) { while (continueProcessing) { selectOnce(0) } } def selectOnce(timeout:Long) { selector.select(timeout) val jKeys:java.util.Set[SelectionKey] = selector.selectedKeys val keys = JavaConversions.asScalaSet(jKeys).toList selector.selectedKeys.clear() keys foreach { _.interestOps(0) } val callbacks = keys map { _.attachment.asInstanceOf[()=>Unit] } executeCallbacks(callbacks) //Execute callbacks for all selected keys } def executeCallbacks(callbacks:List[()=>Unit]) { callbacks foreach { _() } } }Our main method will create an instance of
NioSelector
,
do some other initialization, and eventually call
NioSelector.run
.
NioListener
The aboveNioSelector
class gives us our main event loop,
but how do we get things hooked up to start generating events?
The basic socket flow on the server side for reading data follows these steps:
- Listen for new connections on a socket.
- When a new connection is made, initialize that connection and wait for input data on that connection.
- When input data is received, process that input data.
- Eventually, close the connection.
NioListener
class takes care of creating the socket on
which we listen, accepting new connections, and passing them off to
another class for per-connection initialization.
We start with the code that creates the listener socket. Since we will be passing this socket to our selector, we must create a
ServerSocketChannel
rather than a regular socket.
We don't want to block waiting for this socket, so we call
configureBlocking(false)
.
import java.net.{InetAddress,InetSocketAddress} import java.nio.channels.ServerSocketChannel class NioListener(selector:NioSelector, hostAddr:InetAddress, port:Int) { val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false); val isa = new InetSocketAddress(hostAddr,port) serverChannel.socket.bind(isa) }Now that we have created our socket, we must register it with our selector. In that call, we will tell the selector that we are interested in the ACCEPT operation on our socket channel. The third argument to our register method is the callback; this is where continuations come in.
Blocking code is generally simpler and thus easier to write and understand than asynchronous (non-blocking) code. By using continuations we can capture the current state of the code, explicitly save it for later execution, and continue running the thread at a different point. The code within our delimited continuation is blocked, but the thread does not get blocked.
Rather than explicitly creating and passing in a callback with application state, we arrange things such that the callback passed to our register method includes a call to our continuation, so that when that callback is called, it executes our continuation and makes our code continue from where we left off.
We implement our own
accept
method that looks to the calling
code like the blocking
ServerSocket.accept
method,
but that uses continuations as described above to do the blocking.
We then call our accept
method from within a reset
block that delimits the continuation and determines how much code is
saved in the continuation (everything within the reset
)
and the point at which execution continues (just past the reset
)
in the thread when the accept
call blocks.
import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel import scala.util.continuations._ //In NioListener class def start(continueListening: =>Boolean):Unit = { reset { while (continueListening) { val socket = accept() NioConnection.newConnection(selector,socket) } } } private def accept():SocketChannel @suspendable = { shift { k => selector.register(serverChannel,SelectionKey.OP_ACCEPT, { val conn = serverChannel.accept() conn.configureBlocking(false) k(conn) }) } }When we call our
start
method, it enters the
reset
block and calls accept
, which
registers our interest in the ACCEPT operation,
saves the continuation as part of our callback
in the register
method
and immediately returns control to the end of the
reset
block, thus the start
method returns quickly.
When a new connection arrives, the select code in NioSelector
calls our callback from its executeCallbacks
method.
Our callback accepts the new connection and sets it to non-blocking,
then executes our continuation, which will "return" from
our blocked accept
method into the loop in our start
method. The code processes the new connection, then loops and calls
accept
again, which reregisters interest in an ACCEPT
operation on our socket and
returns execution to the caller of the continuation, which returns control to
the executeCallbacks
method in NioSelector
.
NioConnection
OurNioConnection
class is responsible for creating our
connection object, registering it with the selector,
reading input data when available and processing that data.
The processing that we do in this example is to echo the data back to
the client.
In order to keep this example simple,
we are not using NIO and continuations on the writing side,
rather we use a simple write
call to write our data directly to the socket.
As with
NioListener
,
we create our own read
call that uses continuations to
block, but otherwise looks similar to the standard Java
InputStream.read(byte[])
call,
except that we read into a
ByteBuffer
rather than a byte array.
As with NioListener
,
our start
method that calls our read
method
(via our intermediate method readWait
)
contains a reset
block that delimits the continuation.
import java.nio.ByteBuffer import java.nio.channels.SelectionKey import java.nio.channels.SocketChannel import scala.util.continuations._ object NioConnection { def newConnection(selector:NioSelector, socket:SocketChannel) { val conn = new NioConnection(selector,socket) conn.start() } } class NioConnection(selector:NioSelector, socket:SocketChannel) { private val buffer = ByteBuffer.allocateDirect(2000) def start():Unit = { reset { while (socket.isOpen) readWait } } private def readWait = { buffer.clear() val count = read(buffer) if (count<1) socket.close() else readAction(buffer) } private def read(b:ByteBuffer):Int @suspendable = { if (!socket.isOpen) -1 //indicate EOF else shift { k => selector.register(socket, SelectionKey.OP_READ, { val n = socket.read(b) k(n) }) } } private def readAction(b:ByteBuffer) { b.flip() socket.write(b) b.clear() } }The program flow is similar to the flow in
NioListener
When we call our start
method, it enters the reset
block and (through the call to readWait
) calls read
,
which registers our interest in the READ operation,
saves the continuation as part of our callback
in the register
method
and immediately returns control to the end of the
reset
block, thus the start
method returns quickly.
When data is available to be read from the socket,
the select code in NioSelector
calls our callback from its executeCallbacks
method.
Our callback reads the data from the connection
then executes our continuation, which will "return" from
our blocked read
method into the readWait
method. The code processes the input data in readAction
(which just echos the data back to the client),
then (if the socket is still open)
loops and ends up calling
read
again, which reregisters interest in a READ
operation on our socket and
returns execution to the caller of the continuation,
which returns control to
the executeCallbacks
method in NioSelector
.
NioServer
Here is a short main program to test the above modules.import java.net.InetAddress object NioServer { def main(args:Array[String]) { val hostAddr:InetAddress = null //listen on local connection val port = 1234 val server = new NioServer(hostAddr,port) server.start() } } class NioServer(hostAddr:InetAddress, port:Int) { val selector = new NioSelector() val listener = new NioListener(selector, hostAddr, port) def start() { listener.start(true) selector.run() } }Create a directory and place into that directory the above four files (
NioSelector.scala
, NioListener.scala
,
NioConnection.scala
and NioServer.scala
),
cd
into that directory, then execute the following commands to
compile the files and execute the test program:
scalac -P:continuations:enable *.scala scala NioServerIf you get an "Address already in use" error and traceback, edit
NioServer.scala
and pick another port number.
After starting the server, connect to it using telnet:
telnet localhost 1234In the telnet client, each time you enter a line of text the server should echo that line of text back to you.
Limitations
The above example demonstrates a very basic echo server in a bit over 100 lines of code that uses NIO and continuations for reading data, but it has some limitations:- The processing of the input data is buried in the code that reads the data. A well-factored architecture would allow that processing code to be separately maintained.
- We are using one thread for all execution. In the above example we are just echoing clients' input back to them, but if we want to do some heavier processing we will need to move that work off of the select thread onto one or more worker threads so as not to block everyone while handling a request for one client.
- Communication between the select thread and the worker threads will require some kind of buffering, which introduces more points at which execution can be blocked, thus more opportunities to use continuations.
- If we wish to treat the data as characters rather than bytes then we need to do that conversion.
- We are not using IO and continuations when writing data to the client, so if for some reason our socket is not ready for writing the application will not work properly (either it will block waiting for output, or we will lose some output data).
- We are not storing any significant amount of state in our continuations. If all we are doing is a simple echo server, we don't really need continuations. A more sophisticated example would better illustrate the convenience of using continuations to store program state.
Resources
- My explanation of delimited continuations.
- Wikipedia article on Java NIO.
- Rox Java NIO Tutorial, in which James Greenfield builds a server in Java using NIO and gives a number of tips and warnings.
- Jenkov's Java NIO Tutorial goes over each part of the entire NIO package in detail.
- MINA, Apache's network application framework that uses NIO.
- Loft, a single-threaded web server in Scala using NIO and continuations (under development).
- A post from July 2009 on the Scala mailing list with an (uncompiled) example of a simple single-threaded server using NIO. The post demonstrates the approach of creating versions of the standard calls (accept, read, write) that block using continuations.
7 comments:
Very interesting stuff. I wonder how that could be used for MMO game server, for instance -- I've worked on an Erlang server for Worldforge some years ago..
Nice examples of how things be solved in a high-level approach!
But how do you actually work with files, considering that real support for files and filesystems will only ship with Java 7?
Do you employ native libraries or some other trick?
we are planning to use continuation support in the scala language through shift and reset to track the users who are logged in(We are trying to use this instead of using session or cookie to track the users logged in). So please provide us any sample programe to implement the continuation support through shift and reset to track the logged users in scala.
Thierry: I plan on making a few more posts in which I continue to develop this server. Perhaps at a later point it will be easier to see how it could be used for that type of server.
steve: I have not yet had a need for non-blocking file I/O in Java, so I have not attempted a solution. I don't see a way to do this in Java 6 without using a thread.
monisa: I do not have any sample code that tracks users as you suggest, but I could do some contract work for you if you'd like. If you are creating a web app, I suggest you ask on the Lift mailing list to see if it can already do what you want.
Sandeep: When you post a comment with a link to your blog and no commentary about this blog post, I consider it spam, so I have removed it.
Nice post
Post a Comment