Created
February 20, 2017 08:51
-
-
Save oqq/6ef0726bd3cd32c7a3f9854e9643cabc to your computer and use it in GitHub Desktop.
Example of an abstract ElasticSearchReadModel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php declare(strict_types = 1); | |
namespace Acme\Projection\Document; | |
use Acme\Domain\Document\Event\DocumentWasCreated; | |
use Acme\Domain\Document\Event\DocumentContentWasUpdated; | |
use Prooph\EventStore\EventStore; | |
use Prooph\EventStore\Projection\ReadModel; | |
final class DocumentProjectionRunner | |
{ | |
private $projection; | |
public function __construct(EventStore $eventStore, ReadModel $readModel) | |
{ | |
$this->projection = $eventStore->createReadModelProjection('acme_documents', $readModel); | |
$this->projection | |
->fromStream('document_stream') | |
->when([ | |
DocumentWasCreated::class => function ($state, DocumentWasCreated $event) { | |
$this->readModel()->stack('index', $event->documentId()->toString(), [ | |
'content' => $event->content()->toString(), | |
]); | |
}, | |
DocumentContentWasUpdated::class => function ($state, DocumentContentWasUpdated $event) { | |
$this->readModel()->stack('update', $event->documentId()->toString(), [ | |
'content' => $event->newContent()->toString(), | |
]); | |
}, | |
]) | |
; | |
} | |
public function run(bool $delete = false): void | |
{ | |
if ($delete) { | |
$this->projection->delete(true); | |
} | |
$this->projection->run(false); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php declare(strict_types = 1); | |
namespace Acme\Projection; | |
use Elasticsearch\Client; | |
use Prooph\EventStore\Projection\ReadModel; | |
use RuntimeException; | |
class ElasticSearchReadModel implements ReadModel | |
{ | |
private $client; | |
private $index; | |
private $indexBodyProvider; | |
private $stack = []; | |
function __construct(Client $client, string $index, IndexBodyProvider $indexBodyProvider) | |
{ | |
$this->client = $client; | |
$this->index = $index; | |
$this->indexBodyProvider = $indexBodyProvider; | |
} | |
public function init(): void | |
{ | |
$indexBody = $this->indexBodyProvider->getBodyForIndex($this->index); | |
$response = $this->client->indices()->create([ | |
'index' => $this->index, | |
'body' => $indexBody, | |
]); | |
if (!isset($response['acknowledged']) || true !== $response['acknowledged']) { | |
throw new RuntimeException(sprintf( | |
'Could not create index "%s". Response was: %s', | |
$this->index, | |
print_r($response, true) | |
)); | |
} | |
} | |
public function isInitialized(): bool | |
{ | |
return $this->client->indices()->exists([ | |
'index' => $this->index, | |
]); | |
} | |
public function reset(): void | |
{ | |
// There is no flush method for elastic search, so we have to delete and recreate the index. | |
$this->delete(); | |
$this->init(); | |
} | |
public function delete(): void | |
{ | |
$response = $this->client->indices()->delete([ | |
'index' => $this->index, | |
]); | |
if (!isset($response['acknowledged']) || true !== $response['acknowledged']) { | |
throw new RuntimeException(sprintf( | |
'Could not delete index "%s". Response was: %s', | |
$this->index, | |
print_r($response, true) | |
)); | |
} | |
} | |
public function stack(string $operation, ...$args): void | |
{ | |
switch ($operation) { | |
case 'index': | |
$this->stack[] = ['index' => ['_id' => $args[0]]]; | |
$this->stack[] = $args[1]; | |
break; | |
case 'update': | |
$this->stack[] = ['update' => ['_id' => $args[0]]]; | |
$this->stack[] = ['doc' => $args[1]]; | |
break; | |
default: | |
throw new \RuntimeException(sprintf('Operation %s is not valid', $operation)); | |
} | |
} | |
public function persist(): void | |
{ | |
// todo: analyse response for errors | |
$response = $this->client->bulk([ | |
'index' => $this->index, | |
'type' => 'doc', | |
'body' => $this->stack | |
]); | |
$this->stack = []; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php declare(strict_types = 1); | |
namespace Acme\Projection; | |
interface IndexBodyProvider | |
{ | |
public function getBodyForIndex(string $index): array; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment