Skip to content

Instantly share code, notes, and snippets.

@AnatolyRugalev
Created June 18, 2016 08:33
Show Gist options
  • Save AnatolyRugalev/aa06d05269ee34f7e27a0769cea5161e to your computer and use it in GitHub Desktop.
Save AnatolyRugalev/aa06d05269ee34f7e27a0769cea5161e to your computer and use it in GitHub Desktop.
<?php
namespace common\queue;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Yii;
use yii\base\Component;
use yii\base\Exception;
use yii\base\InvalidParamException;
use yii\helpers\Json;
use yii\queue\QueueInterface;
/**
* Class RabbitQueue TODO: WRITE CLASS DESCRIPTION
* @author Anatoly Rugalev <[email protected]>
*/
class RabbitQueue extends Component implements QueueInterface
{
public $host;
public $port = 5672;
public $user;
public $password;
public $vhost = '/';
/**
* @var AMQPStreamConnection
*/
private $_connection;
/**
* @var AMQPChannel
*/
private $_channel;
private $_queues = [];
public $exchangeSettings = [];
public $queueSettings = [];
public function close()
{
if ($this->_channel !== null) {
$this->_channel->close();
$this->_channel = null;
}
if ($this->_connection !== null) {
$this->_connection->close();
$this->_connection = null;
}
}
/**
* @return AMQPStreamConnection
*/
private function getConnection()
{
if (!isset($this->_connection)) {
$this->_connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost);
}
return $this->_connection;
}
/**
* @return \PhpAmqpLib\Channel\AMQPChannel
*/
private function getChannel()
{
if (!isset($this->_channel)) {
$this->_channel = $this->getConnection()->channel();
}
return $this->_channel;
}
private function declareQueue($queue)
{
$settings = isset($this->queueSettings[$queue]) ? $this->queueSettings[$queue] : [];
$passive = isset($settings['passive']) ? $settings['passive'] : false;
$durable = isset($settings['durable']) ? $settings['durable'] : true;
$exclusive = isset($settings['exclusive']) ? $settings['exclusive'] : false;
$autoDelete = isset($settings['autoDelete']) ? $settings['autoDelete'] : false;
$this->getChannel()->queue_declare($queue, $passive, $durable, $exclusive, $autoDelete);
}
private function declareExchange($name)
{
$settings = isset($this->exchangeSettings[$name]) ? $this->exchangeSettings[$name] : [];
$type = isset($settings['type']) ? $settings['type'] : 'direct';
$passive = isset($settings['passive']) ? $settings['passive'] : false;
$durable = isset($settings['durable']) ? $settings['durable'] : true;
$autoDelete = isset($settings['autoDelete']) ? $settings['autoDelete'] : false;
$this->getChannel()->exchange_declare($name, $type, $passive, $durable, $autoDelete);
}
private function bindQueue($queue, $exchange)
{
$this->getChannel()->queue_bind($queue, $exchange);
}
private function prepareQueue($queue)
{
if (!isset($this->_queues[$queue])) {
$this->declareQueue($queue);
$this->declareExchange($queue);
$this->bindQueue($queue, $queue);
$this->_queues[$queue] = true;
}
}
/**
* Pushs payload to the queue.
*
* @param mixed $payload
* @param integer $delay
* @param string $queue
* @return string
*/
public function push($payload, $queue, $delay = 0)
{
$payload = Json::encode(['id' => $id = md5(uniqid('', true)), 'body' => serialize($payload)]);
$this->prepareQueue($queue);
$message = new AMQPMessage($payload);
$this->getChannel()->basic_publish($message, $queue);
}
/**
* Pops message from the queue.
*
* @param string $queue
* @return array|false
*/
public function pop($queue)
{
$this->prepareQueue($queue);
$message = $this->getChannel()->basic_get($queue);
if (!$message) {
return false;
}
$payload = Json::decode($message->body);
return [
'id' => $payload['id'],
'body' => unserialize($payload['body']),
'queue' => $queue,
'delivery_tag' => $message->delivery_info['delivery_tag'],
];
}
/**
* Purges the queue.
*
* @param string $queue
*/
public function purge($queue)
{
$this->getChannel()->queue_delete($queue);
$this->getChannel()->exchange_delete($queue);
}
/**
* Releases the message.
*
* @param array $message
* @param integer $delay
* @throws Exception
*/
public function release(array $message, $delay = 0)
{
throw new Exception("Not implemented");
}
/**
* Deletes the message.
*
* @param array $message
*/
public function delete(array $message)
{
$this->getChannel()->basic_ack($message['delivery_tag']);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment