Contents
- The Situation
- The Problem
- The Solution
- Implementation - Basics
- Implementation - Application
- Implementation - Manager
The Situation
The user tells the application to move the arm to the desired position. The application breaks up the motion into little steps and sends them off to the robot arm controller at a steady pace so that the arm moves at a uniform speed to the target position. The arm controller sends back motion events as the arm moves. Those motion events are fed into a simulation of the robotic arm, and a 3D image of the arm is rendered to the screen so that you can watch the image of the robotic arm move as the real arm moves.The motion events are fired (published) by a ListenerManager to each listener, one of which is the simulation of the arm. Since the ListenerManager is running in the thread of the controller, the listener should spend minimal time processing the event. The listener thus simply sends the motion event as a message to an actor that handles all of the processing to update the simulated arm and do the rendering. The listener is actually just a closure, a function that takes one argument which is a motion event (
BotPartMovedEvent
)
and returns nothing (Unit
).
Given the controller instance in the variable bot
,
setting up the simulation listener
in the above architecture looks something like this:
bot.addListener((ev:BotPartMovedEvent)=> renderWorker ! ev)
renderWorker
is an actor that might look something like this:
If the events are coming in at a slow enough rate, the simple solution described above is sufficient. I can hook these pieces together and have an architecture that looks something like this:private val renderWorker = new Actor { def act() { loop { react { case ev:BotPartMovedEvent => updateAndRender(ev) //updateAndRender does the real rendering work } } } }
- The user issues a command to move to a particular point, which sends a message to the actor that processes the command.
- That actor uses a timer to send regularly spaced commands to the robot controller to move in small steps.
- The controller sends back motion events as it moves.
- The listener (registered by the simulator on the controller) puts each motion event into a message and sends it to the actor that handles the rendering.
- The rendering actor updates the simulation model and renders it to the screen.
The Problem
In order to move the arm as smoothly as possible, the application sends commands to the robotic arm controller every 10ms. The arm controller sends back motion events when it moves, so it is sending back events at about 10ms intervals as well. The simulator listens to those events, so it knows that every 10ms the 3D scene has changed.But rendering the 3D image takes a lot longer than 10ms; more on the order of 100ms. I can't just do the rendering every time I get a motion event, or I would quickly build up a large backlog of motion events and the simulated image would lag far behind reality.
I need to slow down the rendering so that I am not attempting to render every 10ms. In other words, I have to ignore most of those motion events, or at least not use most of them to trigger rendering.
The Solution
Fortunately, I don't really need to render the screen every 10ms, I can render at a slower rate. In this case, I chose to render every 250ms. That rate, 4 frames per second, is fast enough to provide reasonable visual matching with the real arm, but slow enough so as not to use up an inordinate amount of processing time.I don't want to clutter my motion event publishing or my render processing with a bunch of code to handle timing, and I don't want to worry about motion events piling up in the inbox of the rendering actor, so I factored the timing concerns out into a separate actor with just one task: throwing away most of the requests. I can do this because I don't really need the information in each motion event, as I can directly query the controller to get its position information.
With this approach my publisher (the controller) can continue to publish all motion events, and my rendering thread doesn't have to make any decisions about whether or not to render, it simply renders every time it gets an event. The new actor decides which of the motion events get discarded and which get sent on to the renderer.
The last step in the basic architecture above (where the listener sends the message to the actor handling the rendering) is thus replaced by two steps:
- The listener (registered by the simulator on the controller) puts the motion event into a message and sends it to the actor that handles the timing issues.
- The timing actor decides when to forward a message and when to discard it, and forwards the selected messages to the actor doing the rendering.
Implementation - Basics
We start with the assumption that our worker actor that is doing the real work (the rendering actor in the example above) accepts messages of a particular type, and we help the developer of that actor remember that assumption by defining a trait (ActionWorker
)
that must be implemented by the worker
actor in order for the timing actor (the manager) to recognize it:
Our timing actor (trait ActionWorker[T] extends Actor
ActionManager
)
is an Actor
that is parameterized by the same type and accepts a
constructor argument that is the actor to be controlled:
Theclass ActionManager[T](worker:ActionWorker[T]) extends Actor ...
ActionManager
class is described in more detail
below.
Lastly we have four message types:
ActionRequest
wraps the real message and is sent to theActionManager
.ActionCommand
is sent from the
ActionManager
to theActionWorker
.ActionCompleted
is sent from theActionWorker
back to theActionManager
when the worker has completed its task.ActionTimerExpired
is used as part of the timeout mechanism that determines which messages to forward to the worker.
You can think of theabstract case class ActionMessage /** Clients send us an action request when they want the worker * to do something for them. * @param request The message to be passed to our worker. * @param delay Wait at least this long after the previous request * before executing this request. */ case class ActionRequest[T](request:T, delayMillis:long) extends ActionMessage { def this(request:T) = this(request, 0) } /** The manager sends the worker an ActionCommand * to tell it to work on something. */ case class ActionCommand[T](manager:ActionManager[T],request:T) extends ActionMessage /** The worker sends an ActionComplete to the manager when it is done with a * command and is ready to receive another command. */ case class ActionCompleted[T](request:T) extends ActionMessage /** The manager sends itself an ActionTimerExpired * when its timer time is up. */ case class ActionTimerExpired() extends ActionMessage
ActionMessage
classes as being a wrapper
for the original event class,
and you can think of the ActionManager
class as being a
wrapper for the original worker actor (the renderer),
as each wrapper takes as a constructor argument an instance of
the wrapped object.
Where before our listener sent the published event
(BotPartMovedEvent
) on to the worker actor
(renderWorker
),
now we send a wrapped event
(BotPartMovedEvent
in an ActionRequest
)
to our wrapped actor
(an ActionWorker
in a ActionManager
).
Implementation - Application
We need to specify somewhere the desired delay between the events that we pass along to the action worker. I chose to include this in theActionRequest
rather than
in the ActionManager
.
Here is the updated code to register the listener,
with changes in bold.
whereval renderDelayMillis = 250 bot.addListener((ev:BotPartMovedEvent)=> renderManager ! ActionRequest(ev,renderDelayMillis))
renderManager
is defined like this:
We modify ourprivate val renderManager = new ActionManager[BotPartMovedEvent](renderActor)
renderWorker
actor to deal with our new classes:
We have a couple of other changes we will be making toprivate val renderActor = new ActionWorker[BotPartMovedEvent] { def act() { loop { react { case ActionCommand(mgr,ev) => updateAndRender(ev) //updateAndRender does the real rendering work } } } }
renderActor
below.
With these classes, here is the general flow for the motion events:
- The controller generates a motion event.
- The listener wraps the motion event in an
ActionRequest
and sends it to theActionManager
. - If the
ActionManager
decides to forward the event, it sends the original motion event in anActionCommand
to theActionWorker
. - When the
ActionWorker
is done with its task, it sends anActionComplete
message back to theActionManager
.
ActionCompleted
message back to the ActionManager
when we are done with our task:
There is one more change to make to the worker actor: make it run at a lower priority, so that the motion controller can have priority in order to keep the robot motion smooth. I did this by usingprivate val renderActor = new ActionWorker[BotPartMovedEvent] { def act() { loop { react { case ActionCommand(mgr,ev) => updateAndRender(ev) mgr ! new ActionCompleted(ev) } } } }
receive
instead of react
,
then explicitly changing the thread priority before entering the loop:
private val renderActor = new ActionWorker[BotPartMovedEvent] { def act() { val th = Thread.currentThread th.setPriority(th.getPriority - 3) loop { receive { case ActionCommand(mgr,ev) => updateAndRender(ev) mgr ! new ActionCompleted(ev) } } } }
Implementation - Manager
Finally we get to the meat, theActionManager
that actually
does the work of deciding which events to forward and which to throw away.
The requirement given above of not forwarding a message more often than
a specified interval was incomplete.
That requirement is refined here and two other requirements are added:
- A message should not be forwarded to the worker until a specified minimum interval has passed since the previous message was forwarded.
- A message should not be forwarded to the worker while the worker is busy.
- The last message received while messages are not being forwarded should be forwarded to the worker at the next available time.
With this approach, given a sequence of messages sent to the manager (where a sequence in this case is defined as a set of messages all of which are separated in time by less than either the minimum interval or the worker processing time, whichever is greater) we are guaranteed that, at a minimum, the first and last messages of the sequence will be forwarded to the worker. The only messages we will be discarding are messages in the middle of the sequence - thus we are eliding messages from the sequence.
Here is the basic structure of the manager class, including the declarations of the variables described above and the basic actor flow with the set of messages we recognize.
We know we will need to start and stop a timer, and that when the timer runs out we will need to send ourselves a signal. Here are the timer support methods that do that.import java.util.Timer import java.util.TimerTask import scala.actors.Actor import scala.actors.Actor.loop class ActionManager[T](worker:ActionWorker[T]) extends Actor { //The next command to be sent to the worker; None when no request pending private var nextRequest:Option[T] = None //True when our worker is busy processing a command from us private var workerIsActive:boolean = false //The time that we last send on an action to our worker private var lastActionTime:long = 0 private val timer = new Timer() private var timerTask:Option[TimerTask] = None def act() { loop { react { case ActionRequest(req,delay) => ... case ActionCompleted(req) => ... case ActionTimerExpired() => ... } } } }
When our timer expires, we will want to forward the next message to the worker if we have one; but if we don't have one, we will want to note that the worker is no longer busy and return to a quiescent state. Here are a few simple methods to handle that:private def stopTimer() { if (timerTask.isDefined) { timerTask.get.cancel() timerTask = None } } private val timerExpired = new ActionTimerExpired() private def startTimer(waitMillis:long) { stopTimer val task = new TimerTask() { override def run() { ActionManager.this ! timerExpired } } timer.schedule(task,waitMillis) timerTask = Some(task) }
Now we can examine the cases in the mainprivate def sendNextOrDone() { if (nextRequest.isDefined) sendNextToTarget() else sequenceDone() } /** Send the pending request along to the target and update our vars. */ private def sendNextToTarget() { val cmd = new ActionCommand(this,nextRequest.get) target ! cmd //send the command to the target nextRequest = None targetIsActive = true } /** Here when the target has become inactive. * This is a good place to put in debugging hooks. */ private def sequenceDone() { targetIsActive = false //no longer active }
act
loop.
The client sends an
ActionRequest
to the manager for each
request, as implemented in our listener above.
In our example, the req
is a BotPartMovedEvent
.
The delay is the minimum time to wait since the previously forwarded
message before forwarding this message.
The manager saves only the most recent message, then decides whether
to send on that message based on the two factors we discussed above
(minimum interval and worker idle).
If it has not been long enough since the last message was forwarded,
we start a timer that will tell us when that time is up.
If there was already a timer from the previous message, we stop that timer,
so that we are always doing our calculations using the current delay
parameter.
Here's the code for that part of our manager:
When the worker finishes processing a message it sends andef act() { ... case ActionRequest(req,delay) => //Save only the latest message. nextRequest = Some(req.asInstanceOf[T]) //See how long it has been since the last message forwarded. val now = System.currentTimeMillis() val thisActionTime = lastActionTime + delay if (thisActionTime>now) { //Need to wait a while before executing this action startTimer(thisActionTime - now) //wait this long } else { //We are past the delay time for this action if (!targetIsActive) { //If the target is not active, we can send along // the action request immediately. sendNextToTarget() } } ... }
ActionComplete
message back to the manager,
which then sends on the next message if there is one and if
the minimum time has elapsed.
Finally, when the timer expires it sends the manager andef act() { ... case ActionCompleted(req) => targetIsActive = false if (timerTask.isEmpty) { sendNextOrDone() } ... }
ActionTimerExpired
message.
If the manager has a message to forward to the worker, and
the worker is not busy, it forward it.
That's it. Put all the pieces together and you have an actor that you can slip into a stream of messages of any type, with just a couple of minor changes to the original message sender and receiver, to throttle the message stream by discarding messages down to a specified rate, and only send a message to the receiver when it is not busy processing a previous message.def act() { ... case ActionTimerExpired() => if (!targetIsActive) { sendNextOrDone() } timerTask = None ... }
My actual implementation includes a few other details, such as counters to see how many messages were received, discarded and forwarded, and some messages for the purpose of retrieving and printing those stats at the desired times, none of which make a significant impact on the above code.