-
-
Save oqq/2501ae87b196cdc3e2fa5302f8d7ceaa to your computer and use it in GitHub Desktop.
prooph MongoEventStore v7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types = 1); | |
namespace Acme\Infrastructure\MongoDb; | |
use MongoDB\Client; | |
use MongoDB\Collection; | |
class MongoConnection | |
{ | |
/** | |
* @var Client | |
*/ | |
private $client; | |
/** | |
* @var string | |
*/ | |
private $dbName; | |
public function __construct(Client $client, string $dbName) | |
{ | |
$this->client = $client; | |
$this->dbName = $dbName; | |
} | |
public function client(): Client | |
{ | |
return $this->client; | |
} | |
public function dbName(): string | |
{ | |
return $this->dbName; | |
} | |
public function selectCollection(string $collectionName, array $options = []): Collection | |
{ | |
return $this->client->selectCollection($this->dbName, $collectionName, $options); | |
} | |
public function replaceCollection(string $collectionName, string $withCollection) | |
{ | |
$adminDb = $this->client->admin; | |
$cursor = $adminDb->command([ | |
'renameCollection' => $this->dbName . '.' . $withCollection, | |
'to' => $this->dbName . '.' . $collectionName, | |
'dropTarget' => true | |
]); | |
return current($cursor->toArray()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types = 1); | |
namespace Acme\Infrastructure\MongoDb; | |
use Iterator; | |
use MongoDB\Collection; | |
use MongoDB\Driver\Cursor; | |
use MongoDB\Driver\Exception\BulkWriteException; | |
use MongoDB\Operation\FindOneAndUpdate; | |
use Prooph\Common\Messaging\Message; | |
use Prooph\Common\Messaging\MessageConverter; | |
use Prooph\Common\Messaging\MessageFactory; | |
use Prooph\EventStore\EventStore; | |
use Prooph\EventStore\Exception\StreamNotFound; | |
use Prooph\EventStore\Metadata\MetadataMatcher; | |
use Prooph\EventStore\Metadata\Operator; | |
use Prooph\EventStore\Projection\Projection; | |
use Prooph\EventStore\Projection\ProjectionFactory; | |
use Prooph\EventStore\Projection\ProjectionOptions; | |
use Prooph\EventStore\Projection\Query; | |
use Prooph\EventStore\Projection\QueryFactory; | |
use Prooph\EventStore\Projection\ReadModel; | |
use Prooph\EventStore\Projection\ReadModelProjection; | |
use Prooph\EventStore\Projection\ReadModelProjectionFactory; | |
use Prooph\EventStore\Stream; | |
use Prooph\EventStore\StreamName; | |
final class MongoEventStore implements EventStore | |
{ | |
const STREAM_COLLECTION = 'streams'; | |
/** | |
* @var MongoConnection | |
*/ | |
private $mongoConnection; | |
/** | |
* @var MessageFactory | |
*/ | |
private $messageFactory; | |
/** | |
* @var MessageConverter | |
*/ | |
private $messageConverter; | |
/** | |
* @var array | |
*/ | |
private $aggregateStreamNames; | |
public function __construct(MongoConnection $mongoConnection, MessageFactory $messageFactory, MessageConverter $messageConverter, array $aggregateStreamNames) | |
{ | |
$this->mongoConnection = $mongoConnection; | |
$this->messageFactory = $messageFactory; | |
$this->messageConverter = $messageConverter; | |
$this->aggregateStreamNames = $aggregateStreamNames; | |
} | |
public function fetchStreamMetadata(StreamName $streamName): array | |
{ | |
$doc = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
->findOne(['_id' => $streamName->toString()]); | |
if (!$doc) { | |
throw StreamNotFound::with($streamName); | |
} | |
return $doc['metadata']; | |
} | |
public function hasStream(StreamName $streamName): bool | |
{ | |
return (bool)$this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
->count(['_id' => $streamName->toString()]); | |
} | |
public function create(Stream $stream): void | |
{ | |
if (iterator_count($stream->streamEvents()) > 0) { | |
throw new \RuntimeException(__CLASS__ . ' does not support creating a stream and appending events in one operation.'); | |
} | |
$streamDoc = ['_id' => $stream->streamName()->toString(), 'metadata' => $stream->metadata(), 'seq' => 0]; | |
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->insertOne($streamDoc); | |
} | |
public function appendTo(StreamName $streamName, Iterator $streamEvents): void | |
{ | |
if (iterator_count($streamEvents) > 1) { | |
throw new \RuntimeException('Due to limited ACID support you can only append one event per operation to the event stream: ' . $streamName->toString()); | |
} | |
foreach($streamEvents as $event) { | |
$this->insertInto($streamName, $this->prepareEventData($event)); | |
} | |
} | |
public function load( | |
StreamName $streamName, | |
int $fromNumber = 1, | |
int $count = null, | |
MetadataMatcher $metadataMatcher = null | |
): Stream | |
{ | |
$collection = $this->getCollectionByStreamName($streamName); | |
if (null === $metadataMatcher) { | |
$metadataMatcher = new MetadataMatcher(); | |
} | |
$query = $this->buildQuery($metadataMatcher); | |
$query['no'] = ['$gte' => $fromNumber]; | |
$options = [ | |
'sort' => ['no' => 1] | |
]; | |
if ($count) { | |
$options['limit'] = $count; | |
} | |
$doc = $collection->findOne($query, $options); | |
if(!$doc) { | |
return new Stream($streamName, new \ArrayIterator([])); | |
} | |
$cursor = $collection->find($query, $options); | |
$iterator = $this->mapCursor($cursor, function (array $event) { | |
return $this->eventDataToMessage($event); | |
}); | |
return new Stream($streamName, $iterator); | |
} | |
public function loadReverse( | |
StreamName $streamName, | |
int $fromNumber = PHP_INT_MAX, | |
int $count = null, | |
MetadataMatcher $metadataMatcher = null | |
): Stream | |
{ | |
$collection = $this->getCollectionByStreamName($streamName); | |
if (null === $metadataMatcher) { | |
$metadataMatcher = new MetadataMatcher(); | |
} | |
$query = $this->buildQuery($metadataMatcher); | |
$query['no'] = ['$lte' => $fromNumber]; | |
$options = [ | |
'sort' => ['no' => -1] | |
]; | |
if ($count) { | |
$options['limit'] = $count; | |
} | |
$cursor = $collection->find($query, $options); | |
$iterator = $this->mapCursor($cursor, function (array $event) { | |
return $this->eventDataToMessage($event); | |
}); | |
return new Stream($streamName, $iterator); | |
} | |
public function delete(StreamName $streamName): void | |
{ | |
//Note: this is not transaction save. | |
//However, delete should only be called for projection streams and mongodb will recreate an empty | |
//stream collection if it not exists. So self::hasStream can return true even if there is no stream collection | |
//but only the ref in the streams collection (scenario if first cmd succeed but second fails) | |
$this->mongoConnection->selectCollection($streamName->toString()) | |
->drop(); | |
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
->deleteOne(['_id' => $streamName->toString()]); | |
} | |
/** | |
* @param Message $e | |
* @return array | |
*/ | |
private function prepareEventData(Message $e) | |
{ | |
$eventArr = $this->messageConverter->convertToArray($e); | |
$eventData = [ | |
'_id' => $eventArr['uuid'], | |
'event_name' => $eventArr['message_name'], | |
'payload' => $eventArr['payload'], | |
'created_at' => $eventArr['created_at']->format('Y-m-d\TH:i:s.u'), | |
'metadata' => $eventArr['metadata'] | |
]; | |
return $eventData; | |
} | |
private function eventDataToMessage(array $eventData): Message | |
{ | |
$createdAt = \DateTimeImmutable::createFromFormat( | |
'Y-m-d\TH:i:s.u', | |
$eventData['created_at'], | |
new \DateTimeZone('UTC') | |
); | |
return $this->messageFactory->createMessageFromArray($eventData['event_name'], [ | |
'uuid' => $eventData['_id'], | |
'created_at' => $createdAt, | |
'payload' => $eventData['payload'], | |
'metadata' => $eventData['metadata'] | |
]); | |
} | |
private function getCollectionByStreamName(StreamName $streamName): Collection | |
{ | |
$streamName = $streamName->toString(); | |
$collection = $this->mongoConnection->selectCollection($streamName); | |
$collection->createIndex([ | |
'no' => 1 | |
], ['unique' => true, 'name' => 'no_idx']); | |
if (in_array($streamName, $this->aggregateStreamNames)) { | |
$collection->createIndex([ | |
'metadata._aggregate_id' => 1, | |
'metadata._aggregate_version' => 1 | |
], ['unique' => true, 'name' => 'aggregate_version_idx']); | |
} | |
return $collection; | |
} | |
private function buildQuery(MetadataMatcher $matcher): array | |
{ | |
$query = []; | |
foreach ($matcher->data() as $match) { | |
$field = $match['field']; | |
$operator = $match['operator']->getValue(); | |
$value = $match['value']; | |
switch ($operator) { | |
case Operator::EQUALS: | |
$operator = '$eq'; | |
break; | |
case Operator::GREATER_THAN: | |
$operator = '$gt'; | |
break; | |
case Operator::GREATER_THAN_EQUALS: | |
$operator = '$gte'; | |
break; | |
case Operator::LOWER_THAN: | |
$operator = '$lt'; | |
break; | |
case Operator::LOWER_THAN_EQUALS: | |
$operator = '$lte'; | |
break; | |
case Operator::NOT_EQUALS: | |
$operator = '$ne'; | |
break; | |
} | |
$query['metadata.' . $field] = [$operator => $value]; | |
} | |
return $query; | |
} | |
private function insertInto(StreamName $streamName, array $eventData): void | |
{ | |
$streamInfo = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
->findOneAndUpdate( | |
['_id' => $streamName->toString()], | |
['$inc' => ['seq' => 1]], | |
['returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER] | |
); | |
if(!$streamInfo) { | |
throw StreamNotFound::with($streamName); | |
} | |
$eventData['no'] = $streamInfo['seq']; | |
try { | |
$this->getCollectionByStreamName($streamName)->insertOne($eventData); | |
} catch (BulkWriteException $e) { | |
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->updateOne( | |
['_id' => $streamName->toString()], | |
['$inc' => ['seq' => -1]] | |
); | |
throw $e; | |
} | |
} | |
private function mapCursor(Cursor $cursor, callable $callback): \IteratorIterator | |
{ | |
return new class($cursor, $callback) extends \IteratorIterator { | |
/** | |
* The function to be apply on all InnerIterator element | |
* | |
* @var callable | |
*/ | |
private $callable; | |
private $currentKey; | |
private $currentVal; | |
/** | |
* The Constructor | |
* | |
* @param Cursor $cursor | |
* @param callable $callable | |
*/ | |
public function __construct(Cursor $cursor, callable $callable) | |
{ | |
parent::__construct($cursor); | |
$this->callable = $callable; | |
} | |
public function valid(): bool | |
{ | |
return !$this->getInnerIterator()->isDead(); | |
} | |
/** | |
* Get the value of the current element | |
*/ | |
public function current() | |
{ | |
$callback = $this->callable; | |
return $callback(parent::current(), parent::key()); | |
} | |
}; | |
} | |
public function updateStreamMetadata(StreamName $streamName, array $newMetadata): void | |
{ | |
// TODO: Implement updateStreamMetadata() method. | |
} | |
public function createQuery(QueryFactory $factory = null): Query | |
{ | |
// TODO: Implement createQuery() method. | |
} | |
public function createProjection( | |
string $name, | |
ProjectionOptions $options = null, | |
ProjectionFactory $factory = null): Projection | |
{ | |
// TODO: Implement createProjection() method. | |
} | |
public function createReadModelProjection( | |
string $name, | |
ReadModel $readModel, | |
ProjectionOptions $options = null, | |
ReadModelProjectionFactory $factory = null | |
): ReadModelProjection | |
{ | |
if (null === $options) { | |
$options = new ProjectionOptions(); | |
} | |
return new MongoReadModelProjection( | |
$this, | |
$name, | |
$readModel, | |
$this->mongoConnection, | |
$options->cacheSize(), | |
$options->persistBlockSize() | |
); | |
} | |
public function getDefaultQueryFactory(): QueryFactory | |
{ | |
throw new \BadMethodCallException(__METHOD__ . ' not supported'); | |
} | |
public function getDefaultProjectionFactory(): ProjectionFactory | |
{ | |
throw new \BadMethodCallException(__METHOD__ . ' not supported'); | |
} | |
public function getDefaultReadModelProjectionFactory(): ReadModelProjectionFactory | |
{ | |
throw new \BadMethodCallException(__METHOD__ . ' not supported'); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types = 1); | |
namespace Acme\Infrastructure\MongoDb; | |
use Closure; | |
use Acme\Model\Message; | |
use MongoDB\Exception\BadMethodCallException; | |
use Prooph\EventStore\EventStore; | |
use Prooph\EventStore\Exception\RuntimeException; | |
use Prooph\EventStore\Exception\StreamNotFound; | |
use Prooph\EventStore\Projection\ReadModel; | |
use Prooph\EventStore\Projection\ReadModelProjection; | |
use Prooph\EventStore\StreamName; | |
final class MongoReadModelProjection implements ReadModelProjection | |
{ | |
const PROJECTIONS_COLLECTION = 'projections'; | |
const LOCK_TIMEOUT_MS = 1000; | |
/** | |
* @var string | |
*/ | |
private $name; | |
/** | |
* @var EventStore | |
*/ | |
private $eventStore; | |
/** | |
* @var ReadModel | |
*/ | |
private $readModel; | |
/** | |
* @var MongoConnection | |
*/ | |
private $mongoConnection; | |
/** | |
* @var array | |
*/ | |
private $streamPositions; | |
/** | |
* @var array | |
*/ | |
private $state = []; | |
/** | |
* @var callable|null | |
*/ | |
private $initCallback; | |
/** | |
* @var Closure|null | |
*/ | |
private $handler; | |
/** | |
* @var array | |
*/ | |
private $handlers = []; | |
private $currentStreamName = null; | |
private $isStopped = false; | |
/** | |
* @var int | |
*/ | |
private $eventCounter = 0; | |
private $cacheSize; | |
private $persistBlockSize; | |
public function __construct( | |
EventStore $eventStore, | |
string $name, | |
ReadModel $readModel, | |
MongoConnection $mongoConnection, | |
int $cacheSize, | |
int $persistBlockSize | |
) { | |
$this->eventStore = $eventStore; | |
$this->name = $name; | |
$this->readModel = $readModel; | |
$this->mongoConnection = $mongoConnection; | |
$this->cacheSize = $cacheSize; | |
$this->persistBlockSize = $persistBlockSize; | |
} | |
/** | |
* The callback has to return an array | |
*/ | |
public function init(Closure $callback): ReadModelProjection | |
{ | |
if (null !== $this->initCallback) { | |
throw new \RuntimeException('Projection already initialized'); | |
} | |
$callback = Closure::bind($callback, $this->createHandlerContext($this->currentStreamName)); | |
$result = $callback(); | |
if (is_array($result)) { | |
$this->state = $result; | |
} | |
$this->initCallback = $callback; | |
return $this; | |
} | |
public function fromStream(string $streamName): ReadModelProjection | |
{ | |
if (null !== $this->streamPositions) { | |
throw new \RuntimeException('From was already called'); | |
} | |
$this->streamPositions = [$streamName => 0]; | |
return $this; | |
} | |
public function fromStreams(string ...$streamNames): ReadModelProjection | |
{ | |
if (null !== $this->streamPositions) { | |
throw new \RuntimeException('From was already called'); | |
} | |
foreach ($streamNames as $streamName) { | |
$this->streamPositions[$streamName] = 0; | |
} | |
return $this; | |
} | |
public function when(array $handlers): ReadModelProjection | |
{ | |
if (null !== $this->handler || ! empty($this->handlers)) { | |
throw new \RuntimeException('When was already called'); | |
} | |
foreach ($handlers as $eventName => $handler) { | |
if (! is_string($eventName)) { | |
throw new \InvalidArgumentException('Invalid event name given, string expected'); | |
} | |
if (! $handler instanceof Closure) { | |
throw new \InvalidArgumentException('Invalid handler given, Closure expected'); | |
} | |
$this->handlers[$eventName] = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName)); | |
} | |
return $this; | |
} | |
public function whenAny(Closure $handler): ReadModelProjection | |
{ | |
if (null !== $this->handler || ! empty($this->handlers)) { | |
throw new \RuntimeException('When was already called'); | |
} | |
$this->handler = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName)); | |
return $this; | |
} | |
public function stop(): void | |
{ | |
$this->isStopped = true; | |
} | |
public function getState(): array | |
{ | |
return $this->state; | |
} | |
public function getName(): string | |
{ | |
return $this->name; | |
} | |
public function delete(bool $deleteProjection): void | |
{ | |
if($deleteProjection) { | |
$this->readModel->delete(); | |
} | |
} | |
public function readModel(): ReadModel | |
{ | |
return $this->readModel; | |
} | |
public function run(bool $keepRunning = true, ?int $usleep = 100): void | |
{ | |
$this->createProjectionIfNotExist(); | |
$this->acquireLock(); | |
try { | |
do { | |
$this->load(); | |
$singleHandler = null !== $this->handler; | |
foreach ($this->streamPositions as $streamName => $position) { | |
try { | |
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1); | |
} catch (StreamNotFound $e) { | |
// no newer events found | |
continue; | |
} | |
if ($singleHandler) { | |
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents()); | |
} else { | |
$this->handleStreamWithHandlers($streamName, $stream->streamEvents()); | |
} | |
if ($this->isStopped) { | |
break; | |
} | |
} | |
if (0 === $this->eventCounter) { | |
if (null !== $usleep) { | |
usleep($usleep); | |
} | |
} else { | |
$this->persist(); | |
} | |
$this->eventCounter = 0; | |
} while ($keepRunning && ! $this->isStopped); | |
} finally { | |
$this->releaseLock(); | |
} | |
} | |
protected function createProjectionIfNotExist(): void | |
{ | |
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION); | |
try { | |
$col->insertOne([ | |
'_id' => $this->name, | |
'position' => [], | |
'state' => [], | |
'locked_until' => null | |
]); | |
} catch (\Throwable $ex) { | |
//ignore errors especially duplicate key errors | |
} | |
} | |
protected function load(): void | |
{ | |
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION); | |
$doc = $col->findOne(['_id' => $this->name]); | |
if (!$doc) { | |
throw new \RuntimeException('Projection information for ' . $this->name . ' missing in collection ' . self::PROJECTIONS_COLLECTION); | |
} | |
$this->streamPositions = $doc['position']; | |
$state = $doc['state']; | |
if (! empty($state)) { | |
$this->state = $state; | |
} | |
} | |
protected function persist(): void | |
{ | |
$this->readModel()->persist(); | |
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC')); | |
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u'); | |
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION); | |
$col->updateOne([ | |
'_id' => $this->name | |
], [ | |
'$set' => [ | |
'position' => $this->streamPositions, | |
'state' => $this->state, | |
'locked_until' => $lockUntilString | |
] | |
]); | |
} | |
public function reset(): void | |
{ | |
$this->createProjectionIfNotExist(); | |
if (null !== $this->streamPositions) { | |
$this->streamPositions = array_map( | |
function (): int { | |
return 0; | |
}, | |
$this->streamPositions | |
); | |
} | |
$this->isStopped = false; | |
$callback = $this->initCallback; | |
if (is_callable($callback)) { | |
$result = $callback(); | |
if (is_array($result)) { | |
$this->state = $result; | |
} | |
} else { | |
$this->state = []; | |
} | |
$this->eventStore->delete(new StreamName($this->name)); | |
$this->readModel->reset(); | |
$this->acquireLock(); | |
$this->persist(); | |
$this->releaseLock(); | |
} | |
/** | |
* @throws RuntimeException | |
*/ | |
protected function acquireLock(): void | |
{ | |
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC')); | |
$nowString = $now->format('Y-m-d\TH:i:s.u'); | |
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u'); | |
$result = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([ | |
'_id' => $this->name, | |
'$or' => [ | |
[ | |
'locked_until' => null, | |
], | |
[ | |
'locked_until' => ['$lt' => $nowString] | |
] | |
] | |
], [ | |
'$set' => [ | |
'locked_until' => $lockUntilString | |
] | |
]); | |
if ($result->getMatchedCount() !== 1) { | |
throw new RuntimeException('Another projection process is already running'); | |
} | |
} | |
protected function releaseLock(): void | |
{ | |
$this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([ | |
'_id' => $this->name, | |
], [ | |
'$set' => [ | |
'locked_until' => null | |
] | |
]); | |
} | |
public function fromCategory(string $name): ReadModelProjection | |
{ | |
throw new BadMethodCallException(__METHOD__ . ' not supported'); | |
} | |
public function fromCategories(string ...$names): ReadModelProjection | |
{ | |
throw new BadMethodCallException(__METHOD__ . ' not supported'); | |
} | |
public function fromAll(): ReadModelProjection | |
{ | |
throw new BadMethodCallException(__METHOD__ . ' not supported'); | |
} | |
private function handleStreamWithSingleHandler(string $streamName, \Iterator $events): void | |
{ | |
$this->currentStreamName = $streamName; | |
$handler = $this->handler; | |
foreach ($events as $event) { | |
/* @var Message $event */ | |
$this->streamPositions[$streamName]++; | |
$this->eventCounter++; | |
$result = $handler($this->state, $event); | |
if (is_array($result)) { | |
$this->state = $result; | |
} | |
if ($this->eventCounter === $this->persistBlockSize) { | |
$this->persist(); | |
$this->eventCounter = 0; | |
} | |
if ($this->isStopped) { | |
break; | |
} | |
} | |
} | |
private function handleStreamWithHandlers(string $streamName, \Iterator $events): void | |
{ | |
$this->currentStreamName = $streamName; | |
foreach ($events as $event) { | |
/* @var Message $event */ | |
$this->streamPositions[$streamName]++; | |
$this->eventCounter++; | |
if (! isset($this->handlers[$event->messageName()])) { | |
continue; | |
} | |
$handler = $this->handlers[$event->messageName()]; | |
$result = $handler($this->state, $event); | |
if (is_array($result)) { | |
$this->state = $result; | |
} | |
if ($this->eventCounter === $this->persistBlockSize) { | |
$this->persist(); | |
$this->eventCounter = 0; | |
} | |
if ($this->isStopped) { | |
break; | |
} | |
} | |
} | |
private function createHandlerContext(?string &$streamName) | |
{ | |
return new class($this, $streamName) { | |
/** | |
* @var ReadModelProjection | |
*/ | |
private $projection; | |
/** | |
* @var ?string | |
*/ | |
private $streamName; | |
public function __construct(ReadModelProjection $projection, ?string &$streamName) | |
{ | |
$this->projection = $projection; | |
$this->streamName = &$streamName; | |
} | |
public function stop(): void | |
{ | |
$this->projection->stop(); | |
} | |
public function readModel(): ReadModel | |
{ | |
return $this->projection->readModel(); | |
} | |
public function streamName(): ?string | |
{ | |
return $this->streamName; | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment