Skip to content

Instantly share code, notes, and snippets.

@jctebbal
Created September 18, 2014 11:58
Show Gist options
  • Save jctebbal/59a779b52abf20470c52 to your computer and use it in GitHub Desktop.
Save jctebbal/59a779b52abf20470c52 to your computer and use it in GitHub Desktop.
Consume RabbitMQ cluster messages behind a load balancer using Pecl AMQP
<?php
/**
when using a RabbitMQ cluster we usually access it via a load balancer (in this case Amazon ELB).
Load balancers allow a maximum idle time to the connections they manage,
this is seen from a client as a connection drop. This script shows how to ignore these drops but
is still able to deal with an actual connectivity problem.
*/
define ( 'MINIMUM_LIFE', 30); //needs to be lower than the load balancer timeout
$last_connection = 0;
$i = 0;
function processMessage($envelope, $queue) {
global $i;
echo "Message $i: " . $envelope->getBody() . "\n";
$queue->ack($envelope->getDeliveryTag());
$i++;
return true; //if for a reason we want to stop gracefully: return false
}
while ( time() - $last_connection > MINIMUM_LIFE){
$last_connection = time();
echo "connected at $last_connection \n";
try{
$connection = new AMQPConnection(array(
'host' => 'load.balancer.ip',
'vhost' => '/',
'login' => 'user',
'password' => 'passwd'
));
$connection->connect();
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setFlags ( AMQP_DURABLE );
$queue->setName('uploaded_queue');
$queue->declareQueue();
// Consume messages on queue
$queue->consume("processMessage");
}catch ( AMQPConnectionException $e ){
echo "Connection drop\n";
}
}
?>
@jctebbal
Copy link
Author

output is like:
connected at 1411039897
Message 0: testing
Message 1: testing
Message 2: testing
Message 3: testing
Connection drop
connected at 1411039973
Connection drop
connected at 1411040031
Connection drop
connected at 1411040089
Message 4: testing
Message 5: testing
Message 6: testing
Connection drop
connected at 1411040187
Connection drop
connected at 1411040245
....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment