Skip to content

Instantly share code, notes, and snippets.

@mpskovvang
Created October 9, 2017 11:52
Show Gist options
  • Save mpskovvang/6f48b60338d08781b476785455436080 to your computer and use it in GitHub Desktop.
Save mpskovvang/6f48b60338d08781b476785455436080 to your computer and use it in GitHub Desktop.
An RabbitMQ exponential backoff helper for failed messages.
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS);
$channel = $connection->channel();
$channel->exchange_declare('example.exchange', 'direct', false, false, false);
$channel->exchange_declare('example.error.exchange', 'direct', false, false, false, false, false);
$channel->queue_declare('example', false, true, false, false, false, [
'x-dead-letter-exchange' => ['S', 'example.error.exchange'],
'x-dead-letter-routing-key' => ['S', 'example']
]);
$channel->queue_bind('example', 'example.exchange', 'example');
$channel->queue_declare('example.error', false, true, false, false, false);
$channel->queue_bind('example.error', 'example.error.exchange', 'example');
$helper = new ExponentialBackoffHelper($channel, 'example', 'example.exchange', 'example', [
'max_attempts' => 3
]);
$channel->basic_qos(null, 1, null);
$channel->basic_consume('example', 'example', false, false, false, false, function ($message) use ($helper) {
echo $message->body;
$helper->error($message);
});
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
<?php
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
class ExponentialBackoffHelper
{
/**
* @var AMQPChannel
*/
private $channel;
/**
* @var string
*/
private $queue;
/**
* @var string
*/
private $exchange;
/**
* @var string
*/
private $routing_key;
/**
* @var integer $max_delay The max time (in seconds) to wait before a retry.
*/
private $max_delay = 60;
/**
* @var integer $factor The base number for the exponential back off.
*/
private $factor = 2;
/**
* @var integer $max_attempts The max number of attempts allowed.
*/
private $max_attempts = 10;
public function __construct(AMQPChannel $channel, $queue, $exchange, $routing_key, array $options = [])
{
$this->channel = $channel;
$this->queue = $queue;
$this->exchange = $exchange;
$this->routing_key = $routing_key;
if (isset($options['max_delay'])) {
if ($options['max_delay'] <= 0) {
throw new \Exception("Option 'max_delay' must be greater than 0.");
}
$this->max_delay = (int) $options['max_delay'];
}
if (isset($options['factor'])) {
if ($options['factor'] <= 0) {
throw new \Exception("Option 'factor' must be greater than 0.");
}
$this->factor = (int) $options['factor'];
}
if (isset($options['max_attempts'])) {
if ($options['max_attempts'] < 0) {
throw new \Exception("Option 'max_attempts' must not be negative.");
}
$this->max_attempts = (int) $options['max_attempts'];
}
}
public function acknowledge(AMQPMessage $message)
{
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag'], false);
}
public function reject(AMQPMessage $message, $requeue = false)
{
$this->retry($message);
}
public function error(AMQPMessage $message)
{
$this->retry($message);
}
public function timeout(AMQPMessage $message)
{
$this->retry($message);
}
private function retry(AMQPMessage $message)
{
$attempts = $this->deaths($message);
if ($attempts < $this->max_attempts) {
$delay = $this->delay($attempts);
$routing_key = $this->queue . '.' . $delay;
$queue = $this->createRetryQueue($delay);
$this->channel->queue_bind($queue, $this->exchange, $routing_key);
$this->channel->basic_publish($message, $this->exchange, $routing_key);
$this->acknowledge($message);
} else {
$message->delivery_info['channel']->basic_reject($message->delivery_info['delivery_tag'], false);
}
}
private function deaths(AMQPMessage $message)
{
$headers = $message->has('application_headers') ? $message->get('application_headers')->getNativeData() : null;
if (is_null($headers) || !isset($headers['x-death'])) {
return 0;
}
$count = 0;
foreach ($headers['x-death'] as $death) {
if (strpos($death['queue'], $this->queue) === 0) {
$count += $death['count'];
}
}
return $count;
}
private function createRetryQueue($delay)
{
$queue = $this->queue . '.retry.' . $delay;
$this->channel->queue_declare($queue, false, true, false, false, false, [
'x-dead-letter-exchange' => ['S', $this->exchange],
'x-dead-letter-routing-key' => ['S', $this->queue],
'x-message-ttl' => ['I', $delay * 1000],
'x-expires' => ['I', $delay * 1000 * 2]
]);
return $queue;
}
public function delay($attempts)
{
return min($this->max_delay, ($attempts + 1) ** $this->factor);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment