Skip to content

Instantly share code, notes, and snippets.

Forked from codeliner/MongoConnection.php
Created February 20, 2017 08:08
Show Gist options
  • Save oqq/2501ae87b196cdc3e2fa5302f8d7ceaa to your computer and use it in GitHub Desktop.
Save oqq/2501ae87b196cdc3e2fa5302f8d7ceaa to your computer and use it in GitHub Desktop.
prooph MongoEventStore v7
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use MongoDB\Client;
use MongoDB\Collection;
class MongoConnection
* @var Client
private $client;
* @var string
private $dbName;
public function __construct(Client $client, string $dbName)
$this->client = $client;
$this->dbName = $dbName;
public function client(): Client
return $this->client;
public function dbName(): string
return $this->dbName;
public function selectCollection(string $collectionName, array $options = []): Collection
return $this->client->selectCollection($this->dbName, $collectionName, $options);
public function replaceCollection(string $collectionName, string $withCollection)
$adminDb = $this->client->admin;
$cursor = $adminDb->command([
'renameCollection' => $this->dbName . '.' . $withCollection,
'to' => $this->dbName . '.' . $collectionName,
'dropTarget' => true
return current($cursor->toArray());
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Iterator;
use MongoDB\Collection;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\BulkWriteException;
use MongoDB\Operation\FindOneAndUpdate;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageConverter;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Metadata\Operator;
use Prooph\EventStore\Projection\Projection;
use Prooph\EventStore\Projection\ProjectionFactory;
use Prooph\EventStore\Projection\ProjectionOptions;
use Prooph\EventStore\Projection\Query;
use Prooph\EventStore\Projection\QueryFactory;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\Projection\ReadModelProjectionFactory;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;
final class MongoEventStore implements EventStore
const STREAM_COLLECTION = 'streams';
* @var MongoConnection
private $mongoConnection;
* @var MessageFactory
private $messageFactory;
* @var MessageConverter
private $messageConverter;
* @var array
private $aggregateStreamNames;
public function __construct(MongoConnection $mongoConnection, MessageFactory $messageFactory, MessageConverter $messageConverter, array $aggregateStreamNames)
$this->mongoConnection = $mongoConnection;
$this->messageFactory = $messageFactory;
$this->messageConverter = $messageConverter;
$this->aggregateStreamNames = $aggregateStreamNames;
public function fetchStreamMetadata(StreamName $streamName): array
$doc = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOne(['_id' => $streamName->toString()]);
if (!$doc) {
throw StreamNotFound::with($streamName);
return $doc['metadata'];
public function hasStream(StreamName $streamName): bool
return (bool)$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->count(['_id' => $streamName->toString()]);
public function create(Stream $stream): void
if (iterator_count($stream->streamEvents()) > 0) {
throw new \RuntimeException(__CLASS__ . ' does not support creating a stream and appending events in one operation.');
$streamDoc = ['_id' => $stream->streamName()->toString(), 'metadata' => $stream->metadata(), 'seq' => 0];
public function appendTo(StreamName $streamName, Iterator $streamEvents): void
if (iterator_count($streamEvents) > 1) {
throw new \RuntimeException('Due to limited ACID support you can only append one event per operation to the event stream: ' . $streamName->toString());
foreach($streamEvents as $event) {
$this->insertInto($streamName, $this->prepareEventData($event));
public function load(
StreamName $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$gte' => $fromNumber];
$options = [
'sort' => ['no' => 1]
if ($count) {
$options['limit'] = $count;
$doc = $collection->findOne($query, $options);
if(!$doc) {
return new Stream($streamName, new \ArrayIterator([]));
$cursor = $collection->find($query, $options);
$iterator = $this->mapCursor($cursor, function (array $event) {
return $this->eventDataToMessage($event);
return new Stream($streamName, $iterator);
public function loadReverse(
StreamName $streamName,
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$lte' => $fromNumber];
$options = [
'sort' => ['no' => -1]
if ($count) {
$options['limit'] = $count;
$cursor = $collection->find($query, $options);
$iterator = $this->mapCursor($cursor, function (array $event) {
return $this->eventDataToMessage($event);
return new Stream($streamName, $iterator);
public function delete(StreamName $streamName): void
//Note: this is not transaction save.
//However, delete should only be called for projection streams and mongodb will recreate an empty
//stream collection if it not exists. So self::hasStream can return true even if there is no stream collection
//but only the ref in the streams collection (scenario if first cmd succeed but second fails)
->deleteOne(['_id' => $streamName->toString()]);
* @param Message $e
* @return array
private function prepareEventData(Message $e)
$eventArr = $this->messageConverter->convertToArray($e);
$eventData = [
'_id' => $eventArr['uuid'],
'event_name' => $eventArr['message_name'],
'payload' => $eventArr['payload'],
'created_at' => $eventArr['created_at']->format('Y-m-d\TH:i:s.u'),
'metadata' => $eventArr['metadata']
return $eventData;
private function eventDataToMessage(array $eventData): Message
$createdAt = \DateTimeImmutable::createFromFormat(
new \DateTimeZone('UTC')
return $this->messageFactory->createMessageFromArray($eventData['event_name'], [
'uuid' => $eventData['_id'],
'created_at' => $createdAt,
'payload' => $eventData['payload'],
'metadata' => $eventData['metadata']
private function getCollectionByStreamName(StreamName $streamName): Collection
$streamName = $streamName->toString();
$collection = $this->mongoConnection->selectCollection($streamName);
'no' => 1
], ['unique' => true, 'name' => 'no_idx']);
if (in_array($streamName, $this->aggregateStreamNames)) {
'metadata._aggregate_id' => 1,
'metadata._aggregate_version' => 1
], ['unique' => true, 'name' => 'aggregate_version_idx']);
return $collection;
private function buildQuery(MetadataMatcher $matcher): array
$query = [];
foreach ($matcher->data() as $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
switch ($operator) {
case Operator::EQUALS:
$operator = '$eq';
case Operator::GREATER_THAN:
$operator = '$gt';
$operator = '$gte';
case Operator::LOWER_THAN:
$operator = '$lt';
case Operator::LOWER_THAN_EQUALS:
$operator = '$lte';
case Operator::NOT_EQUALS:
$operator = '$ne';
$query['metadata.' . $field] = [$operator => $value];
return $query;
private function insertInto(StreamName $streamName, array $eventData): void
$streamInfo = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
['_id' => $streamName->toString()],
['$inc' => ['seq' => 1]],
['returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER]
if(!$streamInfo) {
throw StreamNotFound::with($streamName);
$eventData['no'] = $streamInfo['seq'];
try {
} catch (BulkWriteException $e) {
['_id' => $streamName->toString()],
['$inc' => ['seq' => -1]]
throw $e;
private function mapCursor(Cursor $cursor, callable $callback): \IteratorIterator
return new class($cursor, $callback) extends \IteratorIterator {
* The function to be apply on all InnerIterator element
* @var callable
private $callable;
private $currentKey;
private $currentVal;
* The Constructor
* @param Cursor $cursor
* @param callable $callable
public function __construct(Cursor $cursor, callable $callable)
$this->callable = $callable;
public function valid(): bool
return !$this->getInnerIterator()->isDead();
* Get the value of the current element
public function current()
$callback = $this->callable;
return $callback(parent::current(), parent::key());
public function updateStreamMetadata(StreamName $streamName, array $newMetadata): void
// TODO: Implement updateStreamMetadata() method.
public function createQuery(QueryFactory $factory = null): Query
// TODO: Implement createQuery() method.
public function createProjection(
string $name,
ProjectionOptions $options = null,
ProjectionFactory $factory = null): Projection
// TODO: Implement createProjection() method.
public function createReadModelProjection(
string $name,
ReadModel $readModel,
ProjectionOptions $options = null,
ReadModelProjectionFactory $factory = null
): ReadModelProjection
if (null === $options) {
$options = new ProjectionOptions();
return new MongoReadModelProjection(
public function getDefaultQueryFactory(): QueryFactory
throw new \BadMethodCallException(__METHOD__ . ' not supported');
public function getDefaultProjectionFactory(): ProjectionFactory
throw new \BadMethodCallException(__METHOD__ . ' not supported');
public function getDefaultReadModelProjectionFactory(): ReadModelProjectionFactory
throw new \BadMethodCallException(__METHOD__ . ' not supported');
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Closure;
use Acme\Model\Message;
use MongoDB\Exception\BadMethodCallException;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\RuntimeException;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Projection\ReadModel;
use Prooph\EventStore\Projection\ReadModelProjection;
use Prooph\EventStore\StreamName;
final class MongoReadModelProjection implements ReadModelProjection
const PROJECTIONS_COLLECTION = 'projections';
const LOCK_TIMEOUT_MS = 1000;
* @var string
private $name;
* @var EventStore
private $eventStore;
* @var ReadModel
private $readModel;
* @var MongoConnection
private $mongoConnection;
* @var array
private $streamPositions;
* @var array
private $state = [];
* @var callable|null
private $initCallback;
* @var Closure|null
private $handler;
* @var array
private $handlers = [];
private $currentStreamName = null;
private $isStopped = false;
* @var int
private $eventCounter = 0;
private $cacheSize;
private $persistBlockSize;
public function __construct(
EventStore $eventStore,
string $name,
ReadModel $readModel,
MongoConnection $mongoConnection,
int $cacheSize,
int $persistBlockSize
) {
$this->eventStore = $eventStore;
$this->name = $name;
$this->readModel = $readModel;
$this->mongoConnection = $mongoConnection;
$this->cacheSize = $cacheSize;
$this->persistBlockSize = $persistBlockSize;
* The callback has to return an array
public function init(Closure $callback): ReadModelProjection
if (null !== $this->initCallback) {
throw new \RuntimeException('Projection already initialized');
$callback = Closure::bind($callback, $this->createHandlerContext($this->currentStreamName));
$result = $callback();
if (is_array($result)) {
$this->state = $result;
$this->initCallback = $callback;
return $this;
public function fromStream(string $streamName): ReadModelProjection
if (null !== $this->streamPositions) {
throw new \RuntimeException('From was already called');
$this->streamPositions = [$streamName => 0];
return $this;
public function fromStreams(string ...$streamNames): ReadModelProjection
if (null !== $this->streamPositions) {
throw new \RuntimeException('From was already called');
foreach ($streamNames as $streamName) {
$this->streamPositions[$streamName] = 0;
return $this;
public function when(array $handlers): ReadModelProjection
if (null !== $this->handler || ! empty($this->handlers)) {
throw new \RuntimeException('When was already called');
foreach ($handlers as $eventName => $handler) {
if (! is_string($eventName)) {
throw new \InvalidArgumentException('Invalid event name given, string expected');
if (! $handler instanceof Closure) {
throw new \InvalidArgumentException('Invalid handler given, Closure expected');
$this->handlers[$eventName] = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName));
return $this;
public function whenAny(Closure $handler): ReadModelProjection
if (null !== $this->handler || ! empty($this->handlers)) {
throw new \RuntimeException('When was already called');
$this->handler = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName));
return $this;
public function stop(): void
$this->isStopped = true;
public function getState(): array
return $this->state;
public function getName(): string
return $this->name;
public function delete(bool $deleteProjection): void
if($deleteProjection) {
public function readModel(): ReadModel
return $this->readModel;
public function run(bool $keepRunning = true, ?int $usleep = 100): void
try {
do {
$singleHandler = null !== $this->handler;
foreach ($this->streamPositions as $streamName => $position) {
try {
$stream = $this->eventStore->load(new StreamName($streamName), $position + 1);
} catch (StreamNotFound $e) {
// no newer events found
if ($singleHandler) {
$this->handleStreamWithSingleHandler($streamName, $stream->streamEvents());
} else {
$this->handleStreamWithHandlers($streamName, $stream->streamEvents());
if ($this->isStopped) {
if (0 === $this->eventCounter) {
if (null !== $usleep) {
} else {
$this->eventCounter = 0;
} while ($keepRunning && ! $this->isStopped);
} finally {
protected function createProjectionIfNotExist(): void
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
try {
'_id' => $this->name,
'position' => [],
'state' => [],
'locked_until' => null
} catch (\Throwable $ex) {
//ignore errors especially duplicate key errors
protected function load(): void
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
$doc = $col->findOne(['_id' => $this->name]);
if (!$doc) {
throw new \RuntimeException('Projection information for ' . $this->name . ' missing in collection ' . self::PROJECTIONS_COLLECTION);
$this->streamPositions = $doc['position'];
$state = $doc['state'];
if (! empty($state)) {
$this->state = $state;
protected function persist(): void
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u');
$col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION);
'_id' => $this->name
], [
'$set' => [
'position' => $this->streamPositions,
'state' => $this->state,
'locked_until' => $lockUntilString
public function reset(): void
if (null !== $this->streamPositions) {
$this->streamPositions = array_map(
function (): int {
return 0;
$this->isStopped = false;
$callback = $this->initCallback;
if (is_callable($callback)) {
$result = $callback();
if (is_array($result)) {
$this->state = $result;
} else {
$this->state = [];
$this->eventStore->delete(new StreamName($this->name));
* @throws RuntimeException
protected function acquireLock(): void
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$nowString = $now->format('Y-m-d\TH:i:s.u');
$lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u');
$result = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([
'_id' => $this->name,
'$or' => [
'locked_until' => null,
'locked_until' => ['$lt' => $nowString]
], [
'$set' => [
'locked_until' => $lockUntilString
if ($result->getMatchedCount() !== 1) {
throw new RuntimeException('Another projection process is already running');
protected function releaseLock(): void
'_id' => $this->name,
], [
'$set' => [
'locked_until' => null
public function fromCategory(string $name): ReadModelProjection
throw new BadMethodCallException(__METHOD__ . ' not supported');
public function fromCategories(string ...$names): ReadModelProjection
throw new BadMethodCallException(__METHOD__ . ' not supported');
public function fromAll(): ReadModelProjection
throw new BadMethodCallException(__METHOD__ . ' not supported');
private function handleStreamWithSingleHandler(string $streamName, \Iterator $events): void
$this->currentStreamName = $streamName;
$handler = $this->handler;
foreach ($events as $event) {
/* @var Message $event */
$result = $handler($this->state, $event);
if (is_array($result)) {
$this->state = $result;
if ($this->eventCounter === $this->persistBlockSize) {
$this->eventCounter = 0;
if ($this->isStopped) {
private function handleStreamWithHandlers(string $streamName, \Iterator $events): void
$this->currentStreamName = $streamName;
foreach ($events as $event) {
/* @var Message $event */
if (! isset($this->handlers[$event->messageName()])) {
$handler = $this->handlers[$event->messageName()];
$result = $handler($this->state, $event);
if (is_array($result)) {
$this->state = $result;
if ($this->eventCounter === $this->persistBlockSize) {
$this->eventCounter = 0;
if ($this->isStopped) {
private function createHandlerContext(?string &$streamName)
return new class($this, $streamName) {
* @var ReadModelProjection
private $projection;
* @var ?string
private $streamName;
public function __construct(ReadModelProjection $projection, ?string &$streamName)
$this->projection = $projection;
$this->streamName = &$streamName;
public function stop(): void
public function readModel(): ReadModel
return $this->projection->readModel();
public function streamName(): ?string
return $this->streamName;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment