Skip to content

Instantly share code, notes, and snippets.

@ornicar
Created February 5, 2014 12:21
Show Gist options
  • Save ornicar/8822526 to your computer and use it in GitHub Desktop.
Save ornicar/8822526 to your computer and use it in GitHub Desktop.
package utils
import scala.util.{ Try, Success, Failure }
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
// This actor will perform asynchronous side effects sequentially.
// It ensures two effects are not performed at the same time, without blocking.
// This actor can be used to simulate database transactions.
trait SequentialActor extends Actor {
import SequentialActor._
type ReceiveAsync = PartialFunction[Any, Future[_]]
def process: ReceiveAsync
def idle: Receive = {
case msg ⇒ {
context become busy
processThenDone(Job(msg, sender))
}
}
def busy: Receive = {
case Done ⇒ dequeue match {
case None ⇒ context become idle
case Some(job) ⇒ processThenDone(job)
}
case msg ⇒ queue enqueue Job(msg, sender)
}
def receive = idle
private val queue = collection.mutable.Queue[Job]()
private def dequeue: Option[Job] = Try(queue.dequeue).toOption
private def fallback: ReceiveAsync = {
case _ ⇒ Future successful ()
}
private def processThenDone(job: Job) {
job match {
// we don't want to send Done after actor death
case Job(Terminate, _) ⇒ self ! PoisonPill
case Job(msg, replyTo) ⇒ (process orElse fallback)(msg) andThen {
case Success(reply) => replyTo ! reply
case Failure(error) => replyTo ! Status.Failure(error)
} onComplete {
case _ => (self ! Done)
}
}
}
}
object SequentialActor {
private case object Done
private case object Terminate
private case class Job(msg: Any, replyTo: ActorRef)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment