Created
June 18, 2016 08:33
-
-
Save AnatolyRugalev/aa06d05269ee34f7e27a0769cea5161e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace common\queue; | |
use PhpAmqpLib\Channel\AMQPChannel; | |
use PhpAmqpLib\Connection\AMQPStreamConnection; | |
use PhpAmqpLib\Message\AMQPMessage; | |
use Yii; | |
use yii\base\Component; | |
use yii\base\Exception; | |
use yii\base\InvalidParamException; | |
use yii\helpers\Json; | |
use yii\queue\QueueInterface; | |
/** | |
* Class RabbitQueue TODO: WRITE CLASS DESCRIPTION | |
* @author Anatoly Rugalev <[email protected]> | |
*/ | |
class RabbitQueue extends Component implements QueueInterface | |
{ | |
public $host; | |
public $port = 5672; | |
public $user; | |
public $password; | |
public $vhost = '/'; | |
/** | |
* @var AMQPStreamConnection | |
*/ | |
private $_connection; | |
/** | |
* @var AMQPChannel | |
*/ | |
private $_channel; | |
private $_queues = []; | |
public $exchangeSettings = []; | |
public $queueSettings = []; | |
public function close() | |
{ | |
if ($this->_channel !== null) { | |
$this->_channel->close(); | |
$this->_channel = null; | |
} | |
if ($this->_connection !== null) { | |
$this->_connection->close(); | |
$this->_connection = null; | |
} | |
} | |
/** | |
* @return AMQPStreamConnection | |
*/ | |
private function getConnection() | |
{ | |
if (!isset($this->_connection)) { | |
$this->_connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, $this->vhost); | |
} | |
return $this->_connection; | |
} | |
/** | |
* @return \PhpAmqpLib\Channel\AMQPChannel | |
*/ | |
private function getChannel() | |
{ | |
if (!isset($this->_channel)) { | |
$this->_channel = $this->getConnection()->channel(); | |
} | |
return $this->_channel; | |
} | |
private function declareQueue($queue) | |
{ | |
$settings = isset($this->queueSettings[$queue]) ? $this->queueSettings[$queue] : []; | |
$passive = isset($settings['passive']) ? $settings['passive'] : false; | |
$durable = isset($settings['durable']) ? $settings['durable'] : true; | |
$exclusive = isset($settings['exclusive']) ? $settings['exclusive'] : false; | |
$autoDelete = isset($settings['autoDelete']) ? $settings['autoDelete'] : false; | |
$this->getChannel()->queue_declare($queue, $passive, $durable, $exclusive, $autoDelete); | |
} | |
private function declareExchange($name) | |
{ | |
$settings = isset($this->exchangeSettings[$name]) ? $this->exchangeSettings[$name] : []; | |
$type = isset($settings['type']) ? $settings['type'] : 'direct'; | |
$passive = isset($settings['passive']) ? $settings['passive'] : false; | |
$durable = isset($settings['durable']) ? $settings['durable'] : true; | |
$autoDelete = isset($settings['autoDelete']) ? $settings['autoDelete'] : false; | |
$this->getChannel()->exchange_declare($name, $type, $passive, $durable, $autoDelete); | |
} | |
private function bindQueue($queue, $exchange) | |
{ | |
$this->getChannel()->queue_bind($queue, $exchange); | |
} | |
private function prepareQueue($queue) | |
{ | |
if (!isset($this->_queues[$queue])) { | |
$this->declareQueue($queue); | |
$this->declareExchange($queue); | |
$this->bindQueue($queue, $queue); | |
$this->_queues[$queue] = true; | |
} | |
} | |
/** | |
* Pushs payload to the queue. | |
* | |
* @param mixed $payload | |
* @param integer $delay | |
* @param string $queue | |
* @return string | |
*/ | |
public function push($payload, $queue, $delay = 0) | |
{ | |
$payload = Json::encode(['id' => $id = md5(uniqid('', true)), 'body' => serialize($payload)]); | |
$this->prepareQueue($queue); | |
$message = new AMQPMessage($payload); | |
$this->getChannel()->basic_publish($message, $queue); | |
} | |
/** | |
* Pops message from the queue. | |
* | |
* @param string $queue | |
* @return array|false | |
*/ | |
public function pop($queue) | |
{ | |
$this->prepareQueue($queue); | |
$message = $this->getChannel()->basic_get($queue); | |
if (!$message) { | |
return false; | |
} | |
$payload = Json::decode($message->body); | |
return [ | |
'id' => $payload['id'], | |
'body' => unserialize($payload['body']), | |
'queue' => $queue, | |
'delivery_tag' => $message->delivery_info['delivery_tag'], | |
]; | |
} | |
/** | |
* Purges the queue. | |
* | |
* @param string $queue | |
*/ | |
public function purge($queue) | |
{ | |
$this->getChannel()->queue_delete($queue); | |
$this->getChannel()->exchange_delete($queue); | |
} | |
/** | |
* Releases the message. | |
* | |
* @param array $message | |
* @param integer $delay | |
* @throws Exception | |
*/ | |
public function release(array $message, $delay = 0) | |
{ | |
throw new Exception("Not implemented"); | |
} | |
/** | |
* Deletes the message. | |
* | |
* @param array $message | |
*/ | |
public function delete(array $message) | |
{ | |
$this->getChannel()->basic_ack($message['delivery_tag']); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment