Skip to content

Instantly share code, notes, and snippets.

@oqq
Created January 31, 2018 15:31
Show Gist options
  • Save oqq/539de0022ef6f75f7cf4fbc112721b42 to your computer and use it in GitHub Desktop.
Save oqq/539de0022ef6f75f7cf4fbc112721b42 to your computer and use it in GitHub Desktop.
prooph event bus process manager plugin
<?php
declare(strict_types=1);
namespace Prooph\ServiceBus\Plugin;
use Prooph\Common\Event\ActionEvent;
use Prooph\ServiceBus\CommandBus;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;
final class ProcessManagerPlugin extends AbstractPlugin
{
private $commandBus;
public function __construct(CommandBus $commandBus)
{
$this->commandBus = $commandBus;
}
public function attachToMessageBus(MessageBus $messageBus): void
{
$this->listenerHandlers[] = $messageBus->attach(
MessageBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
return;
}
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);
foreach ($handlers as $handler) {
$commands = $handler->handle($message);
foreach ($commands as $command) {
$this->commandBus->dispatch($command);
}
}
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
},
MessageBus::PRIORITY_INVOKE_HANDLER
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment