-
-
Save alle/ce936884188dc0ba7910aef42c0d8f3e to your computer and use it in GitHub Desktop.
Prooph RedisPlugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Command; | |
use Prooph\Bundle\EventStore\Projection\ReadModelProjection; | |
use Prooph\EventStore\Projection\ProjectionManager; | |
use Prooph\EventStore\Projection\ReadModel; | |
use Prooph\EventStore\Projection\ReadModelProjector; | |
use Psr\Container\ContainerInterface; | |
use Superbalist\PubSub\PubSubAdapterInterface; | |
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; | |
use Symfony\Component\Console\Input\InputInterface; | |
use Symfony\Component\Console\Output\OutputInterface; | |
use Symfony\Component\Console\Style\SymfonyStyle; | |
class ProjectionRunCommand extends ContainerAwareCommand | |
{ | |
/** | |
* @var ContainerInterface | |
*/ | |
private $projectionManagerLocator; | |
/** | |
* @var ContainerInterface | |
*/ | |
private $projectionsLocator; | |
/** | |
* @var ContainerInterface | |
*/ | |
private $projectionReadModelLocator; | |
/** | |
* @var ReadModelProjector[] | |
*/ | |
private $readModelProjectors = []; | |
/** | |
* @var PubSubAdapterInterface | |
*/ | |
private $pubSubAdapter; | |
/** | |
* @var array | |
*/ | |
private $eventCache = []; | |
public function __construct( | |
ContainerInterface $projectionManagerLocator, | |
ContainerInterface $projectionsLocator, | |
ContainerInterface $projectionReadModelLocator, | |
PubSubAdapterInterface $pubSubAdapter | |
) { | |
parent::__construct(); | |
$this->projectionManagerLocator = $projectionManagerLocator; | |
$this->projectionsLocator = $projectionsLocator; | |
$this->projectionReadModelLocator = $projectionReadModelLocator; | |
$this->pubSubAdapter = $pubSubAdapter; | |
} | |
protected function configure() | |
{ | |
$this | |
->setName('app:projection:run') | |
->setDescription('Runs all projections'); | |
} | |
protected function execute(InputInterface $input, OutputInterface $output) | |
{ | |
$io = new SymfonyStyle($input, $output); | |
$io->title('Starting projection'); | |
$projectManagers = $this->getContainer()->getParameter('prooph_event_store.projection_managers'); | |
$manager = current(array_keys($projectManagers)); | |
/** @var ProjectionManager $projectionManager */ | |
$projectionManager = $this->projectionManagerLocator->get($manager); | |
$projectionNames = $projectionManager->fetchProjectionNames(null,100); | |
foreach ($projectionNames as $name) { | |
/** @var ReadModel $readModel */ | |
$readModel = $this->projectionReadModelLocator->get($name); | |
/** @var ReadModelProjection $projection */ | |
$projection = $this->projectionsLocator->get($name); | |
$projector = $projectionManager->createReadModelProjection($name, $readModel); | |
$this->readModelProjectors[] = $projection->project($projector); | |
} | |
$this->pubSubAdapter->subscribe('event_stored', function ($value) use ($io) { | |
foreach ($this->readModelProjectors as $projector) { | |
if (true === \array_key_exists($value['event'], $this->managedEvents($projector))) { | |
$projector->run(false); | |
} | |
} | |
}); | |
} | |
/** | |
* @param ReadModelProjector $projector | |
* | |
* @return array | |
* | |
* @throws \ReflectionException | |
*/ | |
private function managedEvents(ReadModelProjector $projector): array | |
{ | |
if (true === \array_key_exists($projector->getName(), $this->eventCache)) { | |
return $this->eventCache[$projector->getName()]; | |
} | |
$class = \get_class($projector); | |
$myClassReflection = new \ReflectionClass($class); | |
$secret = $myClassReflection->getProperty('handlers'); | |
$secret->setAccessible(true); | |
$result = \array_keys($secret->getValue($projector)); | |
$combine = array_combine($result, $result); | |
$this->eventCache[$projector->getName()] = $combine; | |
return $combine; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace App\Service; | |
use Prooph\Common\Event\ActionEvent; | |
use Prooph\EventSourcing\AggregateChanged; | |
use Prooph\EventStore\ActionEventEmitterEventStore; | |
use Prooph\EventStore\Plugin\AbstractPlugin; | |
use Prooph\EventStore\Stream; | |
use Superbalist\PubSub\PubSubAdapterInterface; | |
class RedisPlugin extends AbstractPlugin | |
{ | |
/** | |
* @var PubSubAdapterInterface | |
*/ | |
private $pubSub; | |
public function __construct(PubSubAdapterInterface $pubSub) | |
{ | |
$this->pubSub = $pubSub; | |
} | |
public function attachToEventStore(ActionEventEmitterEventStore $eventStore): void | |
{ | |
$this->listenerHandlers[] = $eventStore->attach(ActionEventEmitterEventStore::EVENT_APPEND_TO, function (ActionEvent $event): void { | |
/** @var AggregateChanged[] $recordedEvents */ | |
$recordedEvents = $event->getParam('streamEvents', new \ArrayIterator()); | |
foreach ($recordedEvents as $recordedEvent) { | |
$this->pubSub->publish('event_stored' ,[ | |
'event' => \get_class($recordedEvent), | |
'data' => $recordedEvent->toArray() | |
]); | |
} | |
}, -1); | |
$this->listenerHandlers[] = $eventStore->attach(ActionEventEmitterEventStore::EVENT_CREATE, function (ActionEvent $event): void { | |
/** @var Stream $stream */ | |
$stream = $event->getParam('stream'); | |
foreach ($stream->streamEvents() as $recordedEvent) { | |
$this->pubSub->publish('event_stored' ,[ | |
'event' => \get_class($recordedEvent), | |
'data' => $recordedEvent->toArray() | |
]); | |
} | |
}, -1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment