Monday, July 28, 2008

Refactoring Scala Actors

As part of converting Mimprint from Java to Scala, I wanted to switch the concurrency model from using Java Threads to using Scala Actors. At the same time, I wanted to implement a publish/subscribe model for some of those Actors. I decided to combine these two by making an ActorPublisher trait.

In this model, the publisher is an Actor and all subscribers are Actors. The publisher will publish events of type E, and the subscribers all accept events of type E in their receive or react methods. The publisher, in turn, accepts messages to subscribe or unsubscribe a subscriber.

The ActorPublisher class file starts with the boilerplate:
import scala.actors.Actor

I defined a marker trait that I require every subscriber to implement. This trait has no methods, and it doesn't actually enforce that the implementing class can handle the event, but it at least ensures that the subscriber is an Actor, it helps the programmer to remember that the subscriber needs to handle that event, and it documents that that Actor is a Subscriber for that event:
trait Subscriber[E] extends Actor

To allow the subscriber to subscribe to and unsubscribe from the publisher, I defined some messages for that purpose:
sealed abstract class SubscriberRequest[E]
case class Subscribe[E](subscriber:Subscriber[E]) extends SubscriberRequest[E]
case class Unsubscribe[E](subscriber:Subscriber[E]) extends SubscriberRequest[E]

Finally there is the publisher trait, ActorPublisher, which maintains the list of subscribers and provides a method for publishing to that list. Here are the list of subscribers and the publish method:
trait ActorPublisher[E] {
    private var subscribers: List[Subscriber[E]] = Nil
    ...
    protected def publish(message:E) = subscribers.foreach(_ ! message)
}

Before writing those last few lines of code in ActorPublisher, let's take a look at the application class that will use it. In the example below, I have a PlayListTracker class that maintains a list of items. Whenever that list of items changes, PlayListTracker publishes change events in the form of a PlayListMessage:
sealed abstract class PlayListMessage
case class PlayListAddItem( ... ) extends PlayListMessage
case class PlayListRemoveItem( ... ) extends PlayListMessage
The tracker defines request messages to add and remove items, which are all subclasses of PlayListRequest:
sealed abstract class PlayListRequest
case class PlayListRequestAdd( ... ) extends PlayListRequest
case class PlayListRequestRemove( ... ) extends PlayListRequest

The PlayListTracker class has to process the PlayListRequest messages that change the list, but it must also accept and process subscribe and unsubscribe requests. I might have started by assuming it looks something like this:
class PlayListTracker extends ActorPublisher[PlayListMessage] {
    def act() {
        loop {
            react {
                case m:Subscribe => ...
                case m:Unsubscribe => ...
                case m:PlayListRequestAdd => ...
                case m:PlayListRequestRemove => ...
            }
        }
    }
}

It would be nice if those first two lines, the ones that handle the subscribe and unsubscribe, were packaged up in the ActorPublisher trait so I didn't have to worry about them.

Looking at the signature to the react method, note that it accepts as an argument a PartialFunction[Any,Unit]. I can thus modify the code to make the original case statement an explicit PartialFunction:
class PlayListTracker extends ActorPublisher[PlayListMessage] {
    def act() {
        loop {
            react (handleMessage)
        }
    }
    private val handleMessage : PartialFunction[Any,Unit] = {
        case m:Subscribe => ...
        case m:Unsubscribe => ...
        case m:PlayListRequestAdd => ...
        case m:PlayListRequestRemove => ...
    }
}
I split that PartialFunction into two PartialFunctions, and chain them together using the convenient orElse method in PartialFunction:
class PlayListTracker extends ActorPublisher[PlayListMessage] {
    def act() {
        loop {
            react (handleSubscribe orElse handleOther)
        }
    }
    private val handleSubscribe : PartialFunction[Any,Unit] = {
        case m:Subscribe => ...
        case m:Unsubscribe => ...
    }
    private val handleOther : PartialFunction[Any,Unit] = {
        case m:PlayListRequestAdd => ...
        case m:PlayListRequestRemove => ...
    }
}
Now that the handling of the Subscribe and Unsubscribe messages has been split out into a separate method, I can move that method into the ActorPublisher trait, where it looks like this:
trait ActorPublisher[E] {
    private var subscribers: List[Subscriber[E]] = Nil

    protected val handleSubscribe: PartialFunction[Any,Unit] = {
        case m:Subscribe[E] =>
            if (!isSubscriber(m.subscriber))
                subscribers = m.subscriber :: subscribers
        case m:Unsubscribe[E] =>
            subscribers = subscribers.filter(_!=m.subscriber)
    }

    private def isSubscriber(subscriber:Subscriber[E]) =
            subscribers.exists(_==subscriber)

    protected def publish(message:E) = subscribers.foreach(_ ! message)
}

That's it, an ActorPublisher trait that encapsulates the management of subscriptions for a publisher that can be used just by defining a message class, extending ActorPublisher on your publisher class, and invoking handleSubscribe orElse your own partial function to process your requests.

The ActorPublisher trait and the rest of the example code above is available in the source code to Mimprint, which is distributed under the GPL.

No comments: