Created
September 23, 2014 19:01
-
-
Save bluescreen/9ef790501211d09f2bb3 to your computer and use it in GitHub Desktop.
Broadway Projection Rebuild
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
/** | |
* Projection Rebuilder | |
* @author Markus Muschol <[email protected]> | |
*/ | |
use Broadway\Domain\DateTime; | |
use Broadway\Domain\DomainEventStream; | |
use Broadway\Domain\DomainMessage; | |
use Broadway\Domain\Metadata; | |
use Broadway\EventHandling\EventBusInterface; | |
use Broadway\Serializer\SerializerInterface; | |
use Elkuku\Console\Helper\ConsoleProgressBar; | |
use EventStore\EventStore; | |
use EventStore\StreamFeed\Entry; | |
use EventStore\StreamFeed\EntryEmbedMode; | |
use EventStore\StreamFeed\LinkRelation; | |
use EventStore\StreamFeed\StreamFeed; | |
/** | |
* Class BroadwayProjectionRebuilder | |
* @package CQRS | |
*/ | |
class BroadwayProjectionRebuilder { | |
/** Event Store url */ | |
const URL = "http://127.0.0.1:2113"; | |
/** @var bool */ | |
private $isConnected = false; | |
/** @var EventStore */ | |
private $eventStore; | |
/** @var SerializerInterface */ | |
private $serializer; | |
/** | |
* @param SerializerInterface $serializer | |
*/ | |
public function __construct(SerializerInterface $serializer){ | |
$this->serializer = $serializer; | |
$this->eventStore = new EventStore(self::URL); | |
} | |
/** | |
* Replay all event streams and republish all messages to rebuild projection | |
* @param EventBusInterface $eventBus | |
* @param int $chunkSize | |
* @param ConsoleProgressBar $progressBar | |
*/ | |
public function replayStreams(EventBusInterface $eventBus, $chunkSize = 500, ConsoleProgressBar $progressBar = null){ | |
/** @var StreamFeed $feed */ | |
$feed = $this->eventStore->openStreamFeed('$streams', EntryEmbedMode::BODY()); | |
$totalStreams = $this->getEventCount($feed); | |
$this->eventStore->navigateStreamFeed($feed, LinkRelation::NEXT()); | |
$events = []; | |
$streamCount = 0; | |
$eventCount = 0; | |
echo "Rebuilding $totalStreams projections ...\n"; | |
// Optional show Progressbar | |
if($progressBar){ | |
$progressBar->reset('- %fraction% [%bar%] %percent% Elapsed Time: %elapsed%', '=>', '-', 78, $totalStreams); | |
} | |
do { | |
/** @var Entry $entry */ | |
foreach ($feed->getEntries() as $entry) { | |
$entryEvents = $this->fetchEvents($entry->getAggregateId()); | |
foreach($entryEvents as $event) { | |
$events[] = $event; | |
$eventCount++; | |
} | |
if($progressBar){ | |
$progressBar->update($streamCount); | |
} | |
$streamCount++; | |
} | |
// Publish events in chunks to avoid frequent queries | |
// on the db and running out of memory | |
if($eventCount > $chunkSize || $totalStreams < $chunkSize){ | |
$eventBus->publish(new DomainEventStream($events)); | |
$events = []; | |
$eventCount = 0; | |
} | |
} while (null !== ($feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::NEXT()))); | |
echo "Done"; | |
} | |
/** | |
* Fetch Events from stream | |
* @param $id | |
* @return array | |
*/ | |
private function fetchEvents($id){ | |
$feed = $this->eventStore->openStreamFeed($id); | |
$feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::PREVIOUS()); | |
$events = []; | |
do { | |
/** @var Entry $entry */ | |
foreach ($feed->getEntries() as $entry) { | |
// Exclude internal streams like $metadata | |
if($this->isDomainEventStream($entry)){ | |
$events[] = $this->reconstructMessage($entry); | |
} | |
} | |
} while (null !== ($feed = $this->eventStore->navigateStreamFeed($feed, LinkRelation::LAST()))); | |
return $events; | |
} | |
/** | |
* Reconstruct Broadway domain message from event | |
* @param Entry $entry | |
* @return DomainMessage | |
*/ | |
private function reconstructMessage(Entry $entry){ | |
list($playhead, $uuid) = explode('@', $entry->getTitle()); | |
$event = $this->eventStore->readEvent($entry->getEventUrl()); | |
return new DomainMessage( | |
$uuid, | |
intval($playhead), | |
new Metadata([]), | |
$this->serializer->deserialize($event->getData()), | |
DateTime::fromString($entry->getUpdate()) | |
); | |
} | |
/** | |
* Is a valid Domain stream? | |
* @param $entry | |
* @return bool | |
*/ | |
private function isDomainEventStream($entry){ | |
return substr($entry->getType(),0,1) != '$'; | |
} | |
/** | |
* @param StreamFeed $feed | |
* @return int | |
*/ | |
private function getEventCount(StreamFeed $feed){ | |
$eTag = $feed->getJson()['eTag']; | |
return intval(substr($eTag,0,strpos($eTag,';')))+1; | |
} | |
} | |
// Extension in Entry class | |
class Entry{ | |
public function getTitle(){ | |
return $this->json['title']; | |
} | |
public function getData(){ | |
return isset($this->json['data']) ? json_decode($this->json['data'],true) : []; | |
} | |
public function getAggregateId(){ | |
$parts = explode('@', $this->getTitle()); | |
return $parts[1]; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
i see some bad smells and style is not PSR compliant, if you send us a pull request we can review it together