Skip to content

Instantly share code, notes, and snippets.

@gquemener
Last active June 16, 2018 18:17
Show Gist options
  • Save gquemener/161489258c0ff89ce2cfee0c516cc7b9 to your computer and use it in GitHub Desktop.
Save gquemener/161489258c0ff89ce2cfee0c516cc7b9 to your computer and use it in GitHub Desktop.
<?php
namespace {
require_once __DIR__ . '/../vendor/autoload.php';
}
namespace Prooph\ServiceBus\Example\Query {
use Prooph\Common\Messaging\Query;
class GetById extends Query
{
private $id;
protected $messageName = 'Prooph\ServiceBus\Example\Query\GetById';
public function __construct(int $id)
{
$this->id = $id;
$this->init();
}
public function id(): int
{
return $this->id;
}
public function payload(): array
{
return ['id' => $this->id];
}
protected function setPayload(array $payload): void
{
$this->id = $payload['id'];
}
}
}
namespace {
use Prooph\ServiceBus\Example\Query\GetById;
use Prooph\ServiceBus\Plugin\CachePlugin;
use Prooph\ServiceBus\Plugin\Router\QueryRouter;
use Prooph\ServiceBus\QueryBus;
use React\Promise\Deferred;
use Symfony\Component\Cache\Simple\FilesystemCache;
$queryBus = new QueryBus();
$router = new QueryRouter();
$router->route('Prooph\ServiceBus\Example\Query\GetById')
->to(function (GetById $query, Deferred $deferred): void {
$deferred->resolve('[' . date('Y-m-d H:i:s') . '] Answer to the universe');
});
$router->attachToMessageBus($queryBus);
$cache = new FilesystemCache('', 1000, '/app/cache');
$cacher = new CachePlugin($cache);
$cacher->attachToMessageBus($queryBus);
$getById = new GetById(42);
$echo = function (string $result) {
echo $result . PHP_EOL;
};
$queryBus->dispatch($getById)->then($echo);
$queryBus->dispatch($getById)->then($echo);
$cache->clear();
$queryBus->dispatch($getById)->then($echo);
}
<?php
declare(strict_types=1);
namespace App\ServiceBus\Plugin;
use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Messaging\Query;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\QueryBus;
use Prooph\ServiceBus\AbstractPlugin;
use Psr\SimpleCache\CacheInterface;
final class QueryCachePlugin extends AbstractPlugin
{
private $cache;
public function __construct(CacheInterface $cache)
{
$this->cache = $cache;
}
public function attachToMessageBus(MessageBus $messageBus): void
{
// We make sure that this plugin cannot be attached to anything else than a query bus
if (!$messageBus instanceof QueryBus) {
throw new \InvalidArgumentException(sprintf(
'The cache plugin can only be attached to an instance of "Prooph\\ServiceBus\\QueryBus", got "%s".',
get_class($messageBus)
));
}
$this->listenerHandlers[] = $messageBus->attach(
QueryBus::EVENT_DISPATCH, // Occurs whenever a query is dispatched
function (ActionEvent $actionEvent): void {
if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
// If the query has already been handled by another plugin, we are too late
return;
}
$query = $actionEvent->getParam(QueryBus::EVENT_PARAM_MESSAGE);
// Prooph Query Bus uses reactphp/promise to handle query messages
$deferred = $actionEvent->getParam(QueryBus::EVENT_PARAM_DEFERRED);
$key = $this->getCacheKey($query);
if (null !== $result = $this->cache->get($key)) {
// We have a cache hit, let's early resolve this promise with the cached result.
$deferred->resolve($result);
// Let's unresolve the handler parameter to prevent the query finder to be called
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, null);
return;
}
// When we have a cache miss, we wait for the promise to be resolved,
// typically by the appropriate query finder, and we cache the result
// for further fetching.
$deferred->promise()->then(function ($data) use ($key) {
$this->cache->set($key, $data);
});
},
// We position this event listener directly before the one responsible of invoking the query finder
QueryBus::PRIORITY_INVOKE_HANDLER + 1
);
}
private function getCacheKey(Query $query): string
{
$keyParts = [$query->messageName()];
foreach ($query->payload() as $key => $value) {
$keyParts[] = $key;
$keyParts[] = $value;
}
return hash('sha512', json_encode($keyParts));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment