Created
March 17, 2011 17:40
-
-
Save casualjim/874766 to your computer and use it in GitHub Desktop.
A resque implementation in scala with akka actors and scala-redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mojolly.backchat | |
package redis | |
package resque | |
import com.mojolly.backchat.redis.resque.Resque.{ResqueWorkerActor} | |
import net.liftweb.json._ | |
import JsonAST._ | |
import JsonDSL._ | |
import java.net.InetAddress | |
import com.redis.ds.{ RedisDeque, RedisDequeClient } | |
import com.redis.RedisClient | |
import java.util.{ Date, GregorianCalendar, Calendar } | |
import akka.dispatch.{ Dispatchers, HawtDispatcher } | |
import akka.actor._ | |
import Actor._ | |
import com.mojolly.backchat.redis.RedisNamespace | |
import actor.{ScheduledTask, Supervising, Supervised} | |
import java.util.concurrent.{TimeUnit, ScheduledFuture, ConcurrentHashMap} | |
object Resque { | |
private object Meta { | |
/* | |
* For converting scala objects into DBObject values | |
*/ | |
object Reflection { | |
/* | |
* These don't require a conversion and can be put directly into a DBObject | |
*/ | |
val primitives = Set[Class[_]](classOf[String], classOf[Int], classOf[Long], classOf[Double], | |
classOf[Float], classOf[Byte], classOf[BigInt], classOf[Boolean], | |
classOf[Short], classOf[java.lang.Integer], classOf[java.lang.Long], | |
classOf[java.lang.Double], classOf[java.lang.Float], | |
classOf[java.lang.Byte], classOf[java.lang.Boolean], | |
classOf[java.lang.Short]) | |
def primitive_?(clazz: Class[_]) = primitives contains clazz | |
/* | |
* This is used to convert DBObjects into JObjects | |
*/ | |
def primitive2jvalue(a: Any) = a match { | |
case x: String => JString(x) | |
case x: Int => JInt(x) | |
case x: Long => JInt(x) | |
case x: Double => JDouble(x) | |
case x: Float => JDouble(x) | |
case x: Byte => JInt(BigInt(x)) | |
case x: BigInt => JInt(x) | |
case x: Boolean => JBool(x) | |
case x: Short => JInt(BigInt(x)) | |
case x: java.lang.Integer => JInt(BigInt(x.asInstanceOf[Int])) | |
case x: java.lang.Long => JInt(BigInt(x.asInstanceOf[Long])) | |
case x: java.lang.Double => JDouble(x.asInstanceOf[Double]) | |
case x: java.lang.Float => JDouble(x.asInstanceOf[Float]) | |
case x: java.lang.Byte => JInt(BigInt(x.asInstanceOf[Byte])) | |
case x: java.lang.Boolean => JBool(x.asInstanceOf[Boolean]) | |
case x: java.lang.Short => JInt(BigInt(x.asInstanceOf[Short])) | |
case x if datetype_?(x.asInstanceOf[AnyRef].getClass) => datetype2jvalue(x) | |
case _ => error("not a primitive " + a.asInstanceOf[AnyRef].getClass) | |
} | |
/* | |
* Date types require formatting | |
*/ | |
val datetypes = Set[Class[_]](classOf[Calendar], classOf[Date], classOf[GregorianCalendar]) | |
def datetype_?(clazz: Class[_]) = datetypes contains clazz | |
def datetype2jvalue(a: Any)(implicit formats: Formats) = a match { | |
case x: Calendar => dateAsJValue(x.getTime) | |
case x: Date => dateAsJValue(x) | |
} | |
def dateAsJValue(d: Date)(implicit formats: Formats) = | |
JObject(JField("$dt", JString(formats.dateFormat.format(d))) :: Nil) | |
} | |
} | |
implicit def namespaceToString(ns: RedisNamespace): String = ns.toString | |
import messages._ | |
private var _resque: ActorRef = null | |
private var _resqueSupervisor: ActorRef = null | |
private def resque: ActorRef = { | |
if (_resque.isNull) { | |
_resque = addToSupervisor(actorOf(new Resque(RedisConfig()))) | |
} | |
_resque | |
} | |
private def addToSupervisor(actor: ActorRef) = { | |
if (_resqueSupervisor.isNull) { | |
_resqueSupervisor = actorOf(new Actor with Supervising { }).start | |
Runtime.getRuntime.addShutdownHook(new Thread { | |
override def run = { | |
Actor.registry.actorsFor(classOf[ResqueListener]) foreach { worker => | |
Option(_resque) foreach { _ ! StopWorker(worker) } | |
} | |
} | |
}) | |
} | |
actor.start | |
_resqueSupervisor ! Link(actor) | |
actor | |
} | |
def resque_=(res: ActorRef) = _resque = res | |
def register[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]){ | |
val (job, q) = getForQueue(worker) | |
resque ! Reserve(q.name, job) | |
} | |
private def getForQueue[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]) = { | |
val a = actorOf(worker) | |
val q = queueJobMap.get(Symbol(mf.erasure.getSimpleName)) | |
(a, q) | |
} | |
def enqueue[Worker <: ResqueWorker](worker: => Worker, args: Any*)(implicit mf: Manifest[Worker]) = { | |
val (job, q) = getForQueue(worker) | |
resque ! Enqueue(q.name, job.id, args.toList) | |
} | |
object Naming { | |
object QueueName { | |
def apply(name: String)(implicit resqueConfig: ResqueConfig) = { | |
( resqueConfig.namespace :: queue :: name :: Nil) mkString keySeparator | |
} | |
} | |
object StatKey { | |
def apply(statistic: String)(implicit resqueConfig: ResqueConfig) = { | |
(resqueConfig.namespace :: stat :: statistic :: Nil) mkString keySeparator | |
} | |
def apply(statistic: String, workerId: String)(implicit resqueConfig: ResqueConfig) = { | |
(resqueConfig.namespace :: stat :: statistic :: workerId :: Nil) mkString keySeparator | |
} | |
} | |
object WorkerKey { | |
def apply(workerId: String)(implicit resqueConfig: ResqueConfig) = { | |
(resqueConfig.namespace :: worker :: workerId :: Nil) mkString keySeparator | |
} | |
} | |
object WorkerId extends { | |
def apply(pid: String, queueName: String) = { | |
(hostName :: pid :: queueName :: Nil) mkString keySeparator | |
} | |
} | |
object WorkerSet { | |
def apply()(implicit resqueConfig: ResqueConfig) = (resqueConfig.namespace :: workers :: Nil) mkString keySeparator | |
} | |
} | |
trait DefaultResqueConfig { | |
implicit val resqueConfig = ResqueConfig() | |
} | |
private object messages { | |
sealed trait ResqueMessage { | |
def toJson = Serialization.write(this) | |
} | |
case object Poll extends ResqueMessage | |
case class Perform(data: String) extends ResqueMessage | |
case class Reserve(queue: String, worker: ActorRef) extends ResqueMessage | |
case class StopWorker(worker: ActorRef) extends ResqueMessage | |
case object StartResque extends ResqueMessage | |
case class StartedWorker(worker: ActorRef) extends ResqueMessage | |
case object Stop extends ResqueMessage | |
case class Enqueue(queue: String, klass: String, args: List[Any]) extends ResqueMessage | |
sealed trait JobResult | |
case class JobFailure( | |
payload: String, queue: String, worker: ActorRef, | |
error: String, backtrace: List[String], failed_at: DateTime = DateTime.now) extends JobResult { | |
def toJson = Serialization.write(this) | |
} | |
object JobFailure { | |
def apply(payload: String, queue: String, worker: ActorRef, exception: Throwable): JobFailure = | |
apply(payload, queue, worker, exception.getMessage, exception.getStackTrace.map(_.toString).toList) | |
} | |
object JobSuccess extends JobResult | |
case class Success(worker: ActorRef) extends JobResult | |
sealed trait JobProcessingMessage { | |
def toJson = Serialization.write(this) | |
} | |
case class WorkingOn(worker: ActorRef, queue: String, payload: JValue, run_at: DateTime = DateTime.now) extends JobProcessingMessage | |
} | |
private val hawt = new HawtDispatcher | |
class Resque(config: RedisConfig) extends Actor with DefaultResqueConfig with Supervising { | |
self.dispatcher = hawt | |
import Resque.Naming._ | |
protected val resqueClient = new RedisDequeClient(config.host, config.port) | |
private lazy val redisActor = { | |
val a = actorOf(new Actor { | |
self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher | |
protected val redis = new RedisClient(config.host, config.port) | |
protected def receive = { | |
case m: JobFailure => { | |
val msg: JValue = ("failed_at" -> m.failed_at.toString(ISO8601_DATE)) ~ | |
("payload" -> m.payload) ~ | |
("error" -> m.error) ~ | |
("backtrace" -> m.backtrace) ~ | |
("worker" -> m.worker.id) ~ | |
("queue" -> m.queue) | |
redis.rpush("resque:failed", Printer.compact(render(msg))) | |
} | |
case Success(worker) => { | |
redis.incr(StatKey("processed", worker.id)) | |
redis.incr(StatKey("processed")) | |
redis.del(WorkerKey(worker.id)) | |
} | |
case m @ WorkingOn(worker, queue, payload, runAt) => { | |
val msg: JValue = ("queue" -> QueueName(queue)) ~ ("payload" -> payload) ~ ("run_at" -> runAt.toString(ISO8601_DATE)) | |
redis.set(WorkerKey(worker.id), Printer.compact(render(msg))) | |
} | |
case StopWorker(worker) => { | |
redis.srem(WorkerSet(), worker.id) | |
redis.del(WorkerKey(worker.id) + ":started") | |
} | |
case StartedWorker(worker) => { | |
redis.sadd(WorkerSet(), worker.id) | |
redis.set(WorkerKey(worker.id + ":started"), DateTime.now.toString(ISO8601_DATE)) | |
} | |
case Enqueue(queueName, job, data) => { | |
val json: JValue = ("class" -> job) ~ ("args" -> data.map(Meta.Reflection.primitive2jvalue _)) | |
log debug ("Enqueuing [%s] on [%s] with key [%s]:\n%s", job, queueName, QueueName(queueName), json.toPrettyJson) | |
redis.sadd(ResqueNamespace("queues").toString, queueName) | |
redis.rpush(QueueName(queueName), Printer.compact(render(json)) ) | |
} | |
} | |
}) | |
self startLink a | |
a | |
} | |
override protected def receive = { | |
case Reserve(queueName, worker) => { | |
val queueListener = actorOf(new ResqueListener(queueName, resqueClient.getDeque(QueueName(queueName)))) | |
self startLink queueListener | |
self.startLink(worker) | |
queueListener ! Poll | |
redisActor ! StartedWorker(queueListener) | |
} | |
case m: JobFailure => { | |
redisActor ! m | |
} | |
case m: Success => { | |
redisActor ! m | |
} | |
case m: WorkingOn => { | |
redisActor ! m | |
} | |
case m @ StopWorker(worker) => { | |
redisActor ! m | |
self ! UnlinkAndStop(worker) | |
} | |
case m: StartedWorker => { | |
redisActor ! m | |
} | |
case m: Enqueue => redisActor ! m | |
case Stop => { | |
self.shutdownLinkedActors | |
self.supervisor foreach { _ ! UnlinkAndStop(self) } | |
} | |
} | |
} | |
val queueJobMap = new ConcurrentHashMap[Symbol, Symbol] | |
private class ResqueListener(queue: String, redisDeque: RedisDeque[String]) extends Actor with Supervised { | |
self.dispatcher = hawt | |
protected val timeout = akka.util.duration.pairIntToDuration(250 -> TimeUnit.MILLISECONDS) | |
import Naming.{ WorkerId} | |
self.id = WorkerId(self.uuid.toString, queue) | |
protected def schedulePoll = { | |
currentScheduler = Scheduler.scheduleOnce(() => self ! Poll, timeout.length, timeout.unit) | |
} | |
protected var currentScheduler: ScheduledFuture[AnyRef] = null | |
protected def receive = { | |
case Poll => { | |
val polled = redisDeque.pollFirst | |
polled foreach { x => | |
val json = JsonParser.parse(x) | |
val jn = (json \ "class").extract[String] | |
val actors = Actor.registry.actorsFor(jn) | |
val args = Perform(x) | |
actors foreach { _ ! args } | |
self.supervisor foreach { _ ! WorkingOn(self, queue, json)} | |
} | |
if (polled.isEmpty) schedulePoll | |
} | |
case f: JobFailure => { | |
self.supervisor foreach { _ ! f.copy(worker = self)} | |
self ! Poll | |
} | |
case s: Success => { | |
self.supervisor foreach { _ ! s.copy(worker = self)} | |
self ! Poll | |
} | |
} | |
override def postStop = { | |
if(currentScheduler != null && !(currentScheduler.isCancelled || currentScheduler.isDone)) currentScheduler.cancel(false) | |
} | |
} | |
lazy val hostName = InetAddress.getLocalHost.getHostName | |
private val queue = "queue".intern | |
private val stat = "stat".intern | |
private val worker = "worker".intern | |
private val keySeparator = ":".intern | |
private val queueSeparator = "$".intern | |
private val workers = "workers".intern | |
private val payload = "payload".intern | |
private[resque] trait ResqueWorkerActor extends Actor with Supervised { | |
protected val queue: Symbol | |
self.id = getClass.getSimpleName | |
final protected def receive = { | |
case Perform(data) => { | |
log info ("Worker [%s] received:\n%s", self.id, data) | |
try { | |
val json = JsonParser.parse(data) | |
perform((json \ "args").children.head.values.asInstanceOf[List[Any]]:_ *) | |
notifyOther(Success(self)) | |
} catch { | |
case e => { | |
notifyOther(JobFailure(data, queue.name, self, e)) | |
log.error(e, "There was an error in job: %s", self.id) | |
} | |
} | |
} | |
} | |
private def notifyOther[T](msg: T) { | |
self.sender foreach { _ ! msg } | |
} | |
protected def perform(data: Any*): Unit | |
} | |
} | |
object ResqueNamespace extends RedisNamespace("resque") | |
case class ResqueConfig(namespace: String) | |
object ResqueConfig { def apply(): ResqueConfig = apply("resque") } | |
abstract class ResqueWorker(protected val queue: Symbol) extends ResqueWorkerActor { | |
Resque.queueJobMap.put(Symbol(self.id), queue) | |
} | |
case class RedisConfig(host: String = "localhost", port: Int = 6379) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment