Last active
June 7, 2016 15:37
-
-
Save alsma/a59f90d9b46345b2c02f to your computer and use it in GitHub Desktop.
queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* global require, module */ | |
var _ = require('lodash') | |
, Q = require('q') | |
, uuid = require('node-uuid') | |
, GET_TIMEOUT = 1e3 | |
; | |
/** | |
* @param {RedisClient} redis | |
* @param {Logger} logger | |
* @param {object} config | |
*/ | |
exports.create = function (redis, logger, config) { | |
return new QueueManager(redis, logger, config) | |
}; | |
/** | |
* | |
* @param {RedisClient} redis | |
* @param {Logger} logger | |
* @param {object} config | |
* @param {string} config.name List name where tasks are stored | |
* @param {string?} config.data Hash name where tasks' data are stored | |
* @param {string?} config.inProgress Hash name where tasks in progress are stored | |
* @param {string?} config.retry Hash name where number of retries per task stored | |
* @param {number?} config.requeueInterval Timeout to mark task as stuck and requeue it | |
* @param {number?} config.maxRetries Max tasks retries num | |
* | |
* @constructor | |
*/ | |
function QueueManager(redis, logger, config) { | |
this.redis = redis; | |
this.logger = logger; | |
this._queueName = config.name; | |
this._tasksDataHashName = config.data || this._queueName + ':data'; | |
this._tasksInProgressHashName = config.inProgress || this._queueName + ':in_progress'; | |
this._tasksRetryHashName = config.retry || this._queueName + ':retry'; | |
this._requeueInterval = config.requeueInterval || 30 * 1e3; | |
this._maxRetries = config.maxRetries || 3; | |
setInterval(_.bind(this._requeueStuckTasks, this), this._requeueInterval); | |
} | |
QueueManager.prototype = {}; | |
QueueManager.prototype.enqueue = function (data) { | |
var uuid4 = uuid.v4(); | |
data = data || {}; | |
this.redis.hset(this._tasksDataHashName, uuid4, JSON.stringify(data)); | |
this.redis.rpush(this._queueName, uuid4); | |
}; | |
QueueManager.prototype.requeue = function (taskId) { | |
this.logger.warn('Task was requeued. UUID: ', taskId); | |
this.redis.hdel(this._tasksInProgressHashName, taskId); | |
this.redis.rpush(this._queueName, taskId); | |
this.redis.hincrby(this._tasksRetryHashName, taskId, 1); | |
}; | |
QueueManager.prototype.unqueue = function (taskId) { | |
this.logger.warn('Task was unqueued. UUID: ', taskId); | |
this.redis.hdel(this._tasksInProgressHashName, taskId); | |
this.redis.hdel(this._tasksRetryHashName, taskId); | |
this.redis.hdel(this._tasksDataHashName, taskId); | |
this.redis.lrem(this._queueName, 0, taskId); | |
}; | |
////////////////////////// | |
// private members | |
QueueManager.prototype._requeueStuckTasks = function () { | |
var _self = this, now; | |
now = _.now(); | |
var _defInProgress = Q.defer(); | |
this.redis.hgetall(this._tasksInProgressHashName, function (err, data) { | |
if (err) return _defInProgress.reject(err); | |
_defInProgress.resolve(data || {}); | |
}); | |
var _defRetries = Q.defer(); | |
this.redis.hgetall(this._tasksRetryHashName, function (err, data) { | |
if (err) return _defInProgress.reject(err); | |
_defRetries.resolve(data || {}); | |
}); | |
Q.all([_defInProgress.promise, _defRetries.promise]) | |
.timeout(GET_TIMEOUT) | |
.spread(function (inProgress, retriesNum) { | |
var stuck = _.chain(inProgress) | |
.pick(function (startTimestamp) { return (now - startTimestamp * 1e3) >= _self._requeueInterval; }) | |
.keys() | |
.partition(function (taskId) { return (retriesNum[taskId] || 0) < _self._maxRetries; }) | |
.value(); | |
_.each(stuck[0], _self.requeue, _self); | |
_.each(stuck[1], _self.unqueue, _self); | |
}); | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
use Ramsey\Uuid\Uuid; | |
class Manager | |
{ | |
const DEFAULT_CONSUME_TIMEOUT = 12 * 60 * 60; | |
const DEFAULT_BLPOP_INTERVAL = 30; | |
/** @var \Redis */ | |
protected $redis; | |
/** @var string */ | |
protected $queueName; | |
/** @var string */ | |
protected $tasksDataHashName; | |
/** @var string */ | |
protected $tasksInProgressHashName; | |
/** @var string */ | |
protected $tasksRetryHashName; | |
/** | |
* @param \Redis $redis | |
* @param array $config Config reference: [ | |
* 'name' => 'tasks' // list name where tasks are stored | |
* 'data' => 'tasks:data' // hash name where tasks' data are stored, | |
* 'in_progress' => 'tasks:in_progress' // hash name where tasks in progress are stored | |
* 'retry' => 'tasks:retry' // hash name where number of retries per task stored | |
* ] | |
*/ | |
public function __construct(\Redis $redis, array $config) | |
{ | |
$this->redis = $redis; | |
$this->queueName = $config['name']; | |
$this->tasksDataHashName = $this->getOr($config, 'data', $this->queueName); | |
$this->tasksInProgressHashName = $this->getOr($config, 'in_progress', $this->queueName); | |
$this->tasksRetryHashName = $this->getOr($config, 'retry', $this->queueName); | |
} | |
/** | |
* @param array $data | |
*/ | |
public function enqueue(array $data) | |
{ | |
$uuid4 = Uuid::uuid4(); | |
$taskId = $uuid4->toString(); | |
$this->redis->hSet($this->tasksDataHashName, $taskId, json_encode($data)); | |
$this->redis->rPush($this->queueName, $taskId); | |
} | |
/** | |
* @return Task|null | |
*/ | |
public function dequeue() | |
{ | |
$taskId = $this->redis->lPop($this->queueName); | |
if (false === $taskId) { | |
return null; | |
} | |
return $this->createTask($taskId); | |
} | |
/** | |
* @param callable $consumer | |
* @param int $timeout | |
*/ | |
public function consume(callable $consumer, $timeout = self::DEFAULT_CONSUME_TIMEOUT) | |
{ | |
$start = time(); | |
$continue = true; | |
do { | |
$listItem = $this->redis->blPop($this->queueName, self::DEFAULT_BLPOP_INTERVAL); | |
if (!empty($listItem)) { | |
$res = $consumer($this->createTask(end($listItem))); | |
$continue = $res !== false; | |
} | |
$continue &= ((time() - $start) < $timeout); | |
} while ($continue); | |
} | |
/** | |
* @param Task $task | |
*/ | |
public function compete(Task $task) | |
{ | |
$this->redis->hDel($this->tasksInProgressHashName, $task->getId()); | |
$this->redis->hDel($this->tasksRetryHashName, $task->getId()); | |
$this->redis->hDel($this->tasksDataHashName, $task->getId()); | |
} | |
/** | |
* @param string $taskId UUID | |
* | |
* @return Task | |
*/ | |
protected function createTask($taskId) | |
{ | |
$data = $this->redis->hGet($this->tasksDataHashName, $taskId); | |
$data = false === $data ? [] : json_decode($data, true); | |
$this->redis->hSet($this->tasksInProgressHashName, $taskId, time()); | |
return new Task($taskId, $data); | |
} | |
/** | |
* @param array $array | |
* @param string $key | |
* @param string $valueToGenerateDefault | |
* | |
* @return string | |
*/ | |
protected function getOr(array $array, $key, $valueToGenerateDefault) | |
{ | |
return isset($array[$key]) ? $array[$key] : sprintf('%s:%s', $valueToGenerateDefault, $key); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package alsma.aggregator.queue | |
import java.util.UUID | |
import java.io.StringWriter | |
import com.twitter.util.Future | |
import com.twitter.finagle.redis.Client | |
import org.jboss.netty.buffer.ChannelBuffer | |
import com.fasterxml.jackson.databind.ObjectMapper | |
import com.twitter.finagle.redis.util.{CBToString, StringToChannelBuffer} | |
class Queue(private val mapper: ObjectMapper, private val client: Client, private val queueName: String) { | |
import alsma.aggregator.queue.Queue._ | |
private[queue] val QueueName = queueName | |
private[queue] val DataHashName = s"$queueName:data" | |
private[queue] val InProgressHashName = s"$queueName:in_progress" | |
private[queue] val RetryHashName = s"$queueName:retry" | |
setupRequeue(this) | |
def enqueue(msg: AnyRef): Future[String] = { | |
val uid = UUID.randomUUID.toString.substring(0, 8) | |
val writer = new StringWriter | |
mapper.writeValue(writer, msg) | |
val ops = client.hSet(DataHashName, uid, writer.toString) :: client.rPush(QueueName, uid :: Nil) :: Nil | |
Future join ops map {_ => uid} | |
} | |
def unqueue(uid: String) = { | |
client.hDel(InProgressHashName, Seq(uid)) | |
client.hDel(RetryHashName, Seq(uid)) | |
client.hDel(DataHashName, Seq(uid)) | |
} | |
def requeue(uid: String) = { | |
client.hDel(InProgressHashName, Seq(uid)) | |
client.rPush(QueueName, uid :: Nil) | |
client.hIncrBy(RetryHashName, uid, 1L) | |
} | |
} | |
object Queue { | |
import java.util.concurrent._ | |
val RetryInterval = 3 | |
val MaxRetries = 5 | |
lazy val executor = createExecutor() | |
implicit def string2ChannelBuffer(value: String): ChannelBuffer = StringToChannelBuffer(value) | |
implicit def stringSeq2SeqChannelBuffer(value: Seq[String]): Seq[ChannelBuffer] = value.map {StringToChannelBuffer(_)} | |
def setupRequeue(queue: Queue): Unit = { | |
executor.scheduleAtFixedRate(createRequeueTask(queue), 1, 1, TimeUnit.SECONDS) | |
} | |
private[this] def createRequeueTask(queue: Queue) = { | |
new Runnable { | |
def run(): Unit = { | |
val now: Long = System.currentTimeMillis / 1000 | |
import com.twitter.util.Future | |
Future.join(queue.client.hGetAll(queue.InProgressHashName), queue.client.hGetAll(queue.RetryHashName)). | |
map({ case (inProgress, retries) => (CBToString.fromTuples(inProgress), CBToString.fromTuples(retries))}). | |
map({ case (inProgress, retries) => | |
val retryMap = retries.groupBy(_._1).mapValues(_.head._2.toInt) | |
val (toRequeue, toUnqueue) = inProgress. | |
map(element => (element._1, element._2.toInt)). | |
filter(now - _._2 > RetryInterval). | |
map(_._1). | |
partition(uid => retryMap.getOrElse(uid, 0) < MaxRetries) | |
toRequeue.foreach(queue.requeue) | |
toUnqueue.foreach(queue.unqueue) | |
}) | |
} | |
} | |
} | |
private[this] def createExecutor(): ScheduledThreadPoolExecutor = { | |
new ScheduledThreadPoolExecutor(1, new ThreadFactory { | |
def newThread(r: Runnable): Thread = { | |
val thread = new Thread(r) | |
thread.setDaemon(true) | |
thread | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment