Friday, June 19, 2009

Eliding Actor Messages

One of my Scala applications is a motion control system that I use to move around a robotic arm. I had a minor timing issue which I handled in a generic way using an additional actor.

Contents

The Situation

The user tells the application to move the arm to the desired position. The application breaks up the motion into little steps and sends them off to the robot arm controller at a steady pace so that the arm moves at a uniform speed to the target position. The arm controller sends back motion events as the arm moves. Those motion events are fed into a simulation of the robotic arm, and a 3D image of the arm is rendered to the screen so that you can watch the image of the robotic arm move as the real arm moves.

The motion events are fired (published) by a ListenerManager to each listener, one of which is the simulation of the arm. Since the ListenerManager is running in the thread of the controller, the listener should spend minimal time processing the event. The listener thus simply sends the motion event as a message to an actor that handles all of the processing to update the simulated arm and do the rendering. The listener is actually just a closure, a function that takes one argument which is a motion event (BotPartMovedEvent) and returns nothing (Unit). Given the controller instance in the variable bot, setting up the simulation listener in the above architecture looks something like this:
bot.addListener((ev:BotPartMovedEvent)=> renderWorker ! ev)
renderWorker is an actor that might look something like this:
private val renderWorker = new Actor { def act() { loop { react { case ev:BotPartMovedEvent => updateAndRender(ev) //updateAndRender does the real rendering work } } } }
If the events are coming in at a slow enough rate, the simple solution described above is sufficient. I can hook these pieces together and have an architecture that looks something like this:
  • The user issues a command to move to a particular point, which sends a message to the actor that processes the command.
  • That actor uses a timer to send regularly spaced commands to the robot controller to move in small steps.
  • The controller sends back motion events as it moves.
  • The listener (registered by the simulator on the controller) puts each motion event into a message and sends it to the actor that handles the rendering.
  • The rendering actor updates the simulation model and renders it to the screen.
But this doesn't work.

The Problem

In order to move the arm as smoothly as possible, the application sends commands to the robotic arm controller every 10ms. The arm controller sends back motion events when it moves, so it is sending back events at about 10ms intervals as well. The simulator listens to those events, so it knows that every 10ms the 3D scene has changed.

But rendering the 3D image takes a lot longer than 10ms; more on the order of 100ms. I can't just do the rendering every time I get a motion event, or I would quickly build up a large backlog of motion events and the simulated image would lag far behind reality.

I need to slow down the rendering so that I am not attempting to render every 10ms. In other words, I have to ignore most of those motion events, or at least not use most of them to trigger rendering.

The Solution

Fortunately, I don't really need to render the screen every 10ms, I can render at a slower rate. In this case, I chose to render every 250ms. That rate, 4 frames per second, is fast enough to provide reasonable visual matching with the real arm, but slow enough so as not to use up an inordinate amount of processing time.

I don't want to clutter my motion event publishing or my render processing with a bunch of code to handle timing, and I don't want to worry about motion events piling up in the inbox of the rendering actor, so I factored the timing concerns out into a separate actor with just one task: throwing away most of the requests. I can do this because I don't really need the information in each motion event, as I can directly query the controller to get its position information.

With this approach my publisher (the controller) can continue to publish all motion events, and my rendering thread doesn't have to make any decisions about whether or not to render, it simply renders every time it gets an event. The new actor decides which of the motion events get discarded and which get sent on to the renderer.

The last step in the basic architecture above (where the listener sends the message to the actor handling the rendering) is thus replaced by two steps:
  • The listener (registered by the simulator on the controller) puts the motion event into a message and sends it to the actor that handles the timing issues.
  • The timing actor decides when to forward a message and when to discard it, and forwards the selected messages to the actor doing the rendering.

Implementation - Basics

We start with the assumption that our worker actor that is doing the real work (the rendering actor in the example above) accepts messages of a particular type, and we help the developer of that actor remember that assumption by defining a trait (ActionWorker) that must be implemented by the worker actor in order for the timing actor (the manager) to recognize it:
trait ActionWorker[T] extends Actor
Our timing actor (ActionManager) is an Actor that is parameterized by the same type and accepts a constructor argument that is the actor to be controlled:
class ActionManager[T](worker:ActionWorker[T]) extends Actor ...
The ActionManager class is described in more detail below.

Lastly we have four message types:
  • ActionRequest wraps the real message and is sent to the ActionManager.
  • ActionCommand is sent from the ActionManager to the ActionWorker.
  • ActionCompleted is sent from the ActionWorker back to the ActionManager when the worker has completed its task.
  • ActionTimerExpired is used as part of the timeout mechanism that determines which messages to forward to the worker.
abstract case class ActionMessage /** Clients send us an action request when they want the worker * to do something for them. * @param request The message to be passed to our worker. * @param delay Wait at least this long after the previous request * before executing this request. */ case class ActionRequest[T](request:T, delayMillis:long) extends ActionMessage { def this(request:T) = this(request, 0) } /** The manager sends the worker an ActionCommand * to tell it to work on something. */ case class ActionCommand[T](manager:ActionManager[T],request:T) extends ActionMessage /** The worker sends an ActionComplete to the manager when it is done with a * command and is ready to receive another command. */ case class ActionCompleted[T](request:T) extends ActionMessage /** The manager sends itself an ActionTimerExpired * when its timer time is up. */ case class ActionTimerExpired() extends ActionMessage
You can think of the ActionMessage classes as being a wrapper for the original event class, and you can think of the ActionManager class as being a wrapper for the original worker actor (the renderer), as each wrapper takes as a constructor argument an instance of the wrapped object. Where before our listener sent the published event (BotPartMovedEvent) on to the worker actor (renderWorker), now we send a wrapped event (BotPartMovedEvent in an ActionRequest) to our wrapped actor (an ActionWorker in a ActionManager).

Implementation - Application

We need to specify somewhere the desired delay between the events that we pass along to the action worker. I chose to include this in the ActionRequest rather than in the ActionManager. Here is the updated code to register the listener, with changes in bold.
val renderDelayMillis = 250 bot.addListener((ev:BotPartMovedEvent)=> renderManager ! ActionRequest(ev,renderDelayMillis))
where renderManager is defined like this:
private val renderManager = new ActionManager[BotPartMovedEvent](renderActor)
We modify our renderWorker actor to deal with our new classes:
private val renderActor = new ActionWorker[BotPartMovedEvent] { def act() { loop { react { case ActionCommand(mgr,ev) => updateAndRender(ev) //updateAndRender does the real rendering work } } } }
We have a couple of other changes we will be making to renderActor below.

With these classes, here is the general flow for the motion events:
  1. The controller generates a motion event.
  2. The listener wraps the motion event in an ActionRequest and sends it to the ActionManager.
  3. If the ActionManager decides to forward the event, it sends the original motion event in an ActionCommand to the ActionWorker.
  4. When the ActionWorker is done with its task, it sends an ActionComplete message back to the ActionManager.
To satisfy the above flow, we modify our worker actor to send an ActionCompleted message back to the ActionManager when we are done with our task:
private val renderActor = new ActionWorker[BotPartMovedEvent] { def act() { loop { react { case ActionCommand(mgr,ev) => updateAndRender(ev) mgr ! new ActionCompleted(ev) } } } }
There is one more change to make to the worker actor: make it run at a lower priority, so that the motion controller can have priority in order to keep the robot motion smooth. I did this by using receive instead of react, then explicitly changing the thread priority before entering the loop:
private val renderActor = new ActionWorker[BotPartMovedEvent] { def act() { val th = Thread.currentThread th.setPriority(th.getPriority - 3) loop { receive { case ActionCommand(mgr,ev) => updateAndRender(ev) mgr ! new ActionCompleted(ev) } } } }

Implementation - Manager

Finally we get to the meat, the ActionManager that actually does the work of deciding which events to forward and which to throw away. The requirement given above of not forwarding a message more often than a specified interval was incomplete. That requirement is refined here and two other requirements are added:
  • A message should not be forwarded to the worker until a specified minimum interval has passed since the previous message was forwarded.
  • A message should not be forwarded to the worker while the worker is busy.
  • The last message received while messages are not being forwarded should be forwarded to the worker at the next available time.
The above requirements ensure that the last message in any sequence of messages is always forwarded to the worker. If we receive a message that we can't immediately forward due to one of the previous two requirements, we hang on to it as the most recently received message until both of our restrictions are no longer true (i.e. the minimum interval has elapsed and the worker is idle). If we receive a message while waiting, we discard the previously saved most recent message and replace it with the newly received message. Whatever message we have when the minimum delay is up and the worker is no longer busy will be forwarded to the worker. The three pieces of state the manager uses are thus a timer, a worker busy flag, and the most recently received message.

With this approach, given a sequence of messages sent to the manager (where a sequence in this case is defined as a set of messages all of which are separated in time by less than either the minimum interval or the worker processing time, whichever is greater) we are guaranteed that, at a minimum, the first and last messages of the sequence will be forwarded to the worker. The only messages we will be discarding are messages in the middle of the sequence - thus we are eliding messages from the sequence.

Here is the basic structure of the manager class, including the declarations of the variables described above and the basic actor flow with the set of messages we recognize.
import java.util.Timer import java.util.TimerTask import scala.actors.Actor import scala.actors.Actor.loop class ActionManager[T](worker:ActionWorker[T]) extends Actor { //The next command to be sent to the worker; None when no request pending private var nextRequest:Option[T] = None //True when our worker is busy processing a command from us private var workerIsActive:boolean = false //The time that we last send on an action to our worker private var lastActionTime:long = 0 private val timer = new Timer() private var timerTask:Option[TimerTask] = None def act() { loop { react { case ActionRequest(req,delay) => ... case ActionCompleted(req) => ... case ActionTimerExpired() => ... } } } }
We know we will need to start and stop a timer, and that when the timer runs out we will need to send ourselves a signal. Here are the timer support methods that do that.
private def stopTimer() { if (timerTask.isDefined) { timerTask.get.cancel() timerTask = None } } private val timerExpired = new ActionTimerExpired() private def startTimer(waitMillis:long) { stopTimer val task = new TimerTask() { override def run() { ActionManager.this ! timerExpired } } timer.schedule(task,waitMillis) timerTask = Some(task) }
When our timer expires, we will want to forward the next message to the worker if we have one; but if we don't have one, we will want to note that the worker is no longer busy and return to a quiescent state. Here are a few simple methods to handle that:
private def sendNextOrDone() { if (nextRequest.isDefined) sendNextToTarget() else sequenceDone() } /** Send the pending request along to the target and update our vars. */ private def sendNextToTarget() { val cmd = new ActionCommand(this,nextRequest.get) target ! cmd //send the command to the target nextRequest = None targetIsActive = true } /** Here when the target has become inactive. * This is a good place to put in debugging hooks. */ private def sequenceDone() { targetIsActive = false //no longer active }
Now we can examine the cases in the main act loop.

The client sends an ActionRequest to the manager for each request, as implemented in our listener above. In our example, the req is a BotPartMovedEvent. The delay is the minimum time to wait since the previously forwarded message before forwarding this message. The manager saves only the most recent message, then decides whether to send on that message based on the two factors we discussed above (minimum interval and worker idle). If it has not been long enough since the last message was forwarded, we start a timer that will tell us when that time is up. If there was already a timer from the previous message, we stop that timer, so that we are always doing our calculations using the current delay parameter. Here's the code for that part of our manager:
def act() { ... case ActionRequest(req,delay) => //Save only the latest message. nextRequest = Some(req.asInstanceOf[T]) //See how long it has been since the last message forwarded. val now = System.currentTimeMillis() val thisActionTime = lastActionTime + delay if (thisActionTime>now) { //Need to wait a while before executing this action startTimer(thisActionTime - now) //wait this long } else { //We are past the delay time for this action if (!targetIsActive) { //If the target is not active, we can send along // the action request immediately. sendNextToTarget() } } ... }
When the worker finishes processing a message it sends an ActionComplete message back to the manager, which then sends on the next message if there is one and if the minimum time has elapsed.
def act() { ... case ActionCompleted(req) => targetIsActive = false if (timerTask.isEmpty) { sendNextOrDone() } ... }
Finally, when the timer expires it sends the manager an ActionTimerExpired message. If the manager has a message to forward to the worker, and the worker is not busy, it forward it.
def act() { ... case ActionTimerExpired() => if (!targetIsActive) { sendNextOrDone() } timerTask = None ... }
That's it. Put all the pieces together and you have an actor that you can slip into a stream of messages of any type, with just a couple of minor changes to the original message sender and receiver, to throttle the message stream by discarding messages down to a specified rate, and only send a message to the receiver when it is not busy processing a previous message.

My actual implementation includes a few other details, such as counters to see how many messages were received, discarded and forwarded, and some messages for the purpose of retrieving and printing those stats at the desired times, none of which make a significant impact on the above code.