Created
March 6, 2019 21:39
-
-
Save tomekit/676e93e67c0fe45d2d7f576bc0672542 to your computer and use it in GitHub Desktop.
My way to implement SQS-like queue with delay / visibilityTimeout option and retry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
class RedisQueueStorage | |
{ | |
/** | |
* @var EasyRedis|Redis | |
*/ | |
protected $redis; | |
/** | |
* @var string | |
*/ | |
protected $queueName; | |
/** | |
* @var int (seconds) | |
*/ | |
protected $visibilityTimeout; | |
/** | |
* separateBody set to true; Create hash from the message which is uniqueID; Store message content as a separate value where its hash is the key; | |
* | |
* @var boolean | |
*/ | |
protected $separateBody; | |
public function __construct(EasyRedis $redis, $queueName, $separateBody = true) { | |
$this->redis = $redis; | |
$this->queueName = $queueName; | |
$this->separateBody = $separateBody; | |
} | |
/** | |
* @param int $visibilityTimeout | |
*/ | |
public function setVisibilityTimeout(int $visibilityTimeout): void { | |
$this->visibilityTimeout = $visibilityTimeout; | |
} | |
/** | |
* @param mixed $item | |
*/ | |
public function push($item) { | |
if ($this->separateBody) { | |
$hash = md5(json_encode($item)); | |
$this->redis->setex($hash, 259200, $item); // Save object contents, where hash is the key. TTL: 259200s = 3*24*3600 = 3 days | |
$this->redis->zAdd($this->queueName, time(), $hash); | |
} else { | |
$this->redis->zAdd($this->queueName, time(), $item); | |
} | |
} | |
/** | |
* @param int $limit | |
* @return array | |
*/ | |
public function pop(int $limit = 10) { | |
$items = $this->redis->zRangeByScore($this->queueName, 0, time(), ['limit' => [0, $limit]]); // Get all items until NOW. Don't get future items; | |
foreach($items as $i => $item) { | |
$score = time() + $this->visibilityTimeout; // This heavily relies on time being synchronized correctly between workers | |
$this->redis->zAdd($this->queueName.'-processing', $score, $item); | |
$result = $this->redis->zRem($this->queueName, $item); | |
if(!$result) { // Race condition | |
unset($items[$i]); | |
} | |
} | |
return $items; | |
} | |
public function requeueExpired($limit = 10) { | |
try { | |
$items = $this->redis->zRangeByScore($this->queueName.'-processing', 0, time(), ['limit' => [0, $limit]]); // Get expired hashes; This heavily relies on time being synchronized correctly between workers | |
foreach($items as $i => $item) { | |
$this->redis->zAdd($this->queueName, time()+1, $item); // Add back to main queue with 1s delay; // @TODO fix the race condition; Solution 1: use the <hash>-<retry-count> format as a key; Solution 2: Use ZADD with current timestamp + few seconds (for delayed processing). | |
$this->redis->zRem($this->queueName.'-processing', $item); // Remove from processing queue | |
} | |
return count($items) >= 1; | |
} catch(RedisException $e) { | |
LoggerProvider::getLogger()->error('Redis error in RedisQueueStorage: '.$e->getMessage()); | |
} | |
} | |
/** | |
* @param $item | |
* @return mixed|null | |
*/ | |
public function getMessageBodyByItem($item) { | |
if(!$this->separateBody) { | |
return $item; | |
} | |
$content = $this->redis->get($item); | |
if(!$content) { | |
LoggerProvider::getLogger()->warning("Content is empty", ['queueName' => $this->queueName, 'item' => $item, 'content' => $content]); | |
return null; | |
} | |
return $content; | |
} | |
public function remove($item) { | |
$this->redis->zRem($this->queueName.'-processing', $item); // Remove from processing queue | |
$this->redis->zRem($this->queueName, $item); // Remove from main queue | |
if($this->separateBody) { | |
$this->redis->del($item); // Remove body | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment