Contents
Background
One of the limitations pointed out in the Limitations section of the original post in this series was that we were still directly writing our output data to the socket rather than using non-blocking IO and continuations as we were doing when reading our input data. If a client stops reading its input (or if there is sufficient network congestion that it looks that way from our end) then our socket output buffer may fill up. If that happens, then one of two things will happen when we try to write our data to that socket: either the call will block, or the data will not all be written. If the call blocks, then we have a blocked thread that we can not use for processing other clients until it is unblocked. If there are many clients who are not reading their input, we could have many blocked threads. Since one of the goals of this exercise is to be able to run many clients on a relatively small number of threads, having blocked threads is bad. To avoid this problem, we use non-blocking output and continuations for writing to the output, just as we did for reading the input.The complete source for this series of posts is available on github in my nioserver project, with the specific version after the changes specified in this post tagged as blog-write.
Implementation
We model the output code on the input code by making these changes:- We write a suspending
write
method that registers our interest in writing to the output socket connection. - We add an output queue to receive data from the application.
- We modify the
writeLine
method to add a line to the output queue rather than writing directly to the output socket. - We run a separate control loop that reads from the output queue and writes to the output socket.
//In class NioConnection private val outQ = new CoQueue[String](sched, 10) def start():Unit = { startReader startWriter startApp } private def startWriter() { reset { while (socket.isOpen) writeWait } } private def write(b:ByteBuffer):Int @suspendable = { if (!socket.isOpen) -1 //indicate EOF else shift { k => selector.register(socket, SelectionKey.OP_WRITE, { val n = socket.write(b) k(n) }) } } private def writeBuffer(b:ByteBuffer):Unit @suspendable = { write(b) if (b.remaining>0 && socket.isOpen) writeBuffer(b) else shiftUnit[Unit,Unit,Unit]() } private def writeWait:Unit @suspendable = { val str = outQ.blockingDequeue writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8"))) } def writeLine(s:String):Unit @suspendable = write(s+"\n") def write(s:String):Unit @suspendable = outQ.blockingEnqueue(s)This seems pretty straightforward, but unfortunately it doesn't work. The problem is that we have attempted to register our channel twice (once for read and once for write) with the same selector. The documentation for
SelectableChannel
says,
"A channel may be registered at most once with any particular selector."
If we call
register
for our channel for write when it is
already registered for read, the read registration is overwritten by
the write registration and is lost.
In his Rox Java NIO Tutorial James Greenfield explicitly recommends that you "Use a single selecting thread" and "Modify the selector from the selecting thread only." We could take this approach, adding some code to combine the read and write interest flags when we are in that position, but unlike in James' case we would also need to add some code to demultiplex the separate callbacks for read and write. Instead, we use a different approach: we use separate selectors for reading and writing, and we give each of them its own thread.
Two Selectors
Depending on the implementation, using two selectors and two threads this way could cause problems. However, based on my understanding of the documentation, the code in the Sun implementation and the operation of the POSIX select operation, I believe this approach should work (at least on POSIX systems). This would need to be tested on all supported platforms for a production system.To use separate read and write selectors, we replace the current
selector
parameter in NioConnection
with
two parameters readSelector
and writeSelector
of the same type.
//In object NioConnection: def newConnection(sched:CoScheduler, readSelector:NioSelector, writeSelector:NioSelector, socket:SocketChannel) { val conn = new NioConnection(sched,readSelector, writeSelector,socket) conn.start() } class NioConnection(sched:CoScheduler, readSelector:NioSelector, writeSelector:NioSelector, socket:SocketChannel) { ... private def read(b:ByteBuffer):Int @suspendable = { if (!socket.isOpen) -1 //indicate EOF else shift { k => readSelector.register(socket, SelectionKey.OP_READ, { val n = socket.read(b) k(n) }) } } private def write(b:ByteBuffer):Int @suspendable = { if (!socket.isOpen) -1 //indicate EOF else shift { k => writeSelector.register(socket, SelectionKey.OP_WRITE, { val n = socket.write(b) k(n) }) } } ... }We also change
NioListener
to pass through those
two arguments, and we choose to use the readSelector
to handle our accept
calls.
//In NioListener class NioListener(sched:CoScheduler, readSelector:NioSelector, writeSelector:NioSelector, hostAddr:InetAddress, port:Int) { ... def start(continueListening: =>Boolean):Unit = { reset { while (continueListening) { val socket = accept() NioConnection.newConnection(sched, readSelector,writeSelector,socket) } } } private def accept():SocketChannel @suspendable = { shift { k => readSelector.register(serverChannel,SelectionKey.OP_ACCEPT, { val conn = serverChannel.accept() conn.configureBlocking(false) k(conn) }) } } }Finally, we instantiate the new write selector in
NioServer
,
pass it in to NioListener
, and start it running
in a new thread.
//In NioServer class NioServer(hostAddr:InetAddress, port:Int) { val readSelector = new NioSelector() val writeSelector = new NioSelector() val sched = new DefaultCoScheduler val listener = new NioListener(sched, readSelector, writeSelector, hostAddr, port) def start() { listener.start(true) //run the NIO read and write selectors each on its own thread (new Thread(writeSelector,"WriteSelector")).start (new Thread(readSelector,"ReadSelector")).start Thread.currentThread.setName("CoScheduler") sched.run //run the coroutine scheduler on our thread, renamed } }
Close
Our current example has no terminating condition, so never attempts to close the connection. Looking ahead, we expect to have applications that will want to do that, so we add aclose
method to NioConnection
,
and an isOpen
method that allows us to see when it is closed.
We can't just add a close method that directly closes the socket, because there may still be output data waiting to be written. Thus we need an implementation that somehow waits until all of the queued output data has been written to the output before closing the socket.
One easy way to do this is to have a special marker string that we put into the output queue when the application requests to close the socket. When our socket output code sees that marker, we know it has already written out all of the data that came before that marker in the output queue, so we can close the socket. By doing the socket close in the same method that does the writes to the socket, and by ensuring that that method is called on the (write) selection thread, we also ensure that the close happens on the selection thread.
The compiler shares constant strings, so to make sure we have a unique string for our marker that can't be passed in by any code outside of our
close
method, we use new String()
.
In writeWait
, where we check for that marker,
we use the identity comparison eq
when checking for the marker,
and we add a call to shiftUnit
to make both sides of the
if
statement be CPS.
A call to our
close
method will return right away,
but the socket will not get closed until after all of the data in
the output queue has been written to the output socket.
The application can tell when the socket has actually been closed
by calling the isOpen
method.
//In NioConnection private def writeWait():Unit @suspendable = { val str = outQ.blockingDequeue if (str eq closeMarker) { socket.close shiftUnit[Unit,Unit,Unit]() } else writeBuffer(ByteBuffer.wrap(str.getBytes("UTF-8"))) } def isOpen = socket.isOpen private val closeMarker = new String("") def close():Unit @suspendable = write(closeMarker)
No comments:
Post a Comment