Created
January 18, 2016 14:24
-
-
Save wodCZ/334a469825d98746db1d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
use Kdyby; | |
use Kdyby\RabbitMq\Connection; | |
use Kdyby\RabbitMq\DI\RabbitMqExtension; | |
use Nette; | |
use Nette\Reflection\ClassType; | |
use PhpAmqpLib\Message\AMQPMessage; | |
use Tester; | |
class RabbitProducerMock extends Kdyby\RabbitMq\Producer | |
{ | |
/** | |
* @var array | |
*/ | |
public $messages = []; | |
/** | |
* @var Nette\DI\Container | |
*/ | |
private $serviceLocator; | |
public function __construct(Connection $conn, $consumerTag = NULL, Nette\DI\Container $serviceLocator = NULL) | |
{ | |
parent::__construct($conn, $consumerTag); | |
$this->serviceLocator = $serviceLocator; | |
} | |
public function publish($msgBody, $routingKey = '', $additionalProperties = []) | |
{ | |
$this->messages[$routingKey][] = $msgBody; | |
} | |
/** | |
* @return array | |
*/ | |
public function processMessages() | |
{ | |
$results = []; | |
while (!empty($this->messages)) { | |
$results[] = $this->processMessage(); | |
} | |
return $results; | |
} | |
/** | |
* @return bool|mixed | |
*/ | |
public function processMessage() | |
{ | |
if (!$next = $this->nextMessage()) { | |
return FALSE; | |
} | |
list($routingKey, $msgBody) = $next; | |
if (!$consumer = $this->findConsumer($routingKey)) { | |
throw new InvalidStateException(sprintf('No consumer for exchange "%s" with routing key "%s" found.', $this->exchangeOptions['name'], $routingKey)); | |
} | |
$callbackRefl = new Nette\Reflection\Property($consumer, 'callback'); | |
$callbackRefl->setAccessible(TRUE); | |
$callback = $callbackRefl->getValue($consumer); | |
return call_user_func($callback, new AMQPMessage($msgBody, [])); | |
} | |
/** | |
* @param string $routingKey | |
* @return Kdyby\RabbitMq\Consumer | |
*/ | |
private function findConsumer($routingKey) | |
{ | |
foreach ($this->serviceLocator->findByTag(RabbitMqExtension::TAG_CONSUMER) as $consumerService => $_) { | |
/** @var Kdyby\RabbitMq\Consumer $consumer */ | |
$consumer = $this->serviceLocator->getService($consumerService); | |
if ($consumer instanceof Kdyby\RabbitMq\MultipleConsumer) { | |
continue; // todo: not yet implemented | |
} | |
if ($consumer->exchangeOptions['name'] !== $this->exchangeOptions['name']) { | |
continue; // nope | |
} | |
if (empty($routingKey)) { | |
return $consumer; | |
} | |
continue; // todo: not yet implemented | |
} | |
return NULL; | |
} | |
private function nextMessage() | |
{ | |
foreach ($this->messages as $routingKey => $messages) { | |
foreach ($messages as $i => $message) { | |
unset($this->messages[$routingKey][$i]); | |
if (empty($this->messages[$routingKey])) { | |
unset($this->messages[$routingKey]); | |
} | |
return [$routingKey, $message]; | |
} | |
} | |
return NULL; | |
} | |
/** | |
* @param string $msgBody | |
* @return mixed | |
*/ | |
private function unserializeMessage($msgBody) | |
{ | |
return preg_match('~^(O|C)\\:\\d+\\:.+\\{.+\\}\\z~is', $msgBody) ? unserialize($msgBody) : $msgBody; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
rabbitmq: | |
producers: | |
email: | |
class: RabbitProducerMock | |
sms: | |
class: RabbitProducerMock | |
searchSynch: | |
class: RabbitProducerMock |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment