Last active
June 16, 2018 18:17
-
-
Save gquemener/161489258c0ff89ce2cfee0c516cc7b9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
namespace { | |
require_once __DIR__ . '/../vendor/autoload.php'; | |
} | |
namespace Prooph\ServiceBus\Example\Query { | |
use Prooph\Common\Messaging\Query; | |
class GetById extends Query | |
{ | |
private $id; | |
protected $messageName = 'Prooph\ServiceBus\Example\Query\GetById'; | |
public function __construct(int $id) | |
{ | |
$this->id = $id; | |
$this->init(); | |
} | |
public function id(): int | |
{ | |
return $this->id; | |
} | |
public function payload(): array | |
{ | |
return ['id' => $this->id]; | |
} | |
protected function setPayload(array $payload): void | |
{ | |
$this->id = $payload['id']; | |
} | |
} | |
} | |
namespace { | |
use Prooph\ServiceBus\Example\Query\GetById; | |
use Prooph\ServiceBus\Plugin\CachePlugin; | |
use Prooph\ServiceBus\Plugin\Router\QueryRouter; | |
use Prooph\ServiceBus\QueryBus; | |
use React\Promise\Deferred; | |
use Symfony\Component\Cache\Simple\FilesystemCache; | |
$queryBus = new QueryBus(); | |
$router = new QueryRouter(); | |
$router->route('Prooph\ServiceBus\Example\Query\GetById') | |
->to(function (GetById $query, Deferred $deferred): void { | |
$deferred->resolve('[' . date('Y-m-d H:i:s') . '] Answer to the universe'); | |
}); | |
$router->attachToMessageBus($queryBus); | |
$cache = new FilesystemCache('', 1000, '/app/cache'); | |
$cacher = new CachePlugin($cache); | |
$cacher->attachToMessageBus($queryBus); | |
$getById = new GetById(42); | |
$echo = function (string $result) { | |
echo $result . PHP_EOL; | |
}; | |
$queryBus->dispatch($getById)->then($echo); | |
$queryBus->dispatch($getById)->then($echo); | |
$cache->clear(); | |
$queryBus->dispatch($getById)->then($echo); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\ServiceBus\Plugin; | |
use Prooph\Common\Event\ActionEvent; | |
use Prooph\Common\Messaging\Query; | |
use Prooph\ServiceBus\MessageBus; | |
use Prooph\ServiceBus\QueryBus; | |
use Prooph\ServiceBus\AbstractPlugin; | |
use Psr\SimpleCache\CacheInterface; | |
final class QueryCachePlugin extends AbstractPlugin | |
{ | |
private $cache; | |
public function __construct(CacheInterface $cache) | |
{ | |
$this->cache = $cache; | |
} | |
public function attachToMessageBus(MessageBus $messageBus): void | |
{ | |
// We make sure that this plugin cannot be attached to anything else than a query bus | |
if (!$messageBus instanceof QueryBus) { | |
throw new \InvalidArgumentException(sprintf( | |
'The cache plugin can only be attached to an instance of "Prooph\\ServiceBus\\QueryBus", got "%s".', | |
get_class($messageBus) | |
)); | |
} | |
$this->listenerHandlers[] = $messageBus->attach( | |
QueryBus::EVENT_DISPATCH, // Occurs whenever a query is dispatched | |
function (ActionEvent $actionEvent): void { | |
if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) { | |
// If the query has already been handled by another plugin, we are too late | |
return; | |
} | |
$query = $actionEvent->getParam(QueryBus::EVENT_PARAM_MESSAGE); | |
// Prooph Query Bus uses reactphp/promise to handle query messages | |
$deferred = $actionEvent->getParam(QueryBus::EVENT_PARAM_DEFERRED); | |
$key = $this->getCacheKey($query); | |
if (null !== $result = $this->cache->get($key)) { | |
// We have a cache hit, let's early resolve this promise with the cached result. | |
$deferred->resolve($result); | |
// Let's unresolve the handler parameter to prevent the query finder to be called | |
$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLER, null); | |
return; | |
} | |
// When we have a cache miss, we wait for the promise to be resolved, | |
// typically by the appropriate query finder, and we cache the result | |
// for further fetching. | |
$deferred->promise()->then(function ($data) use ($key) { | |
$this->cache->set($key, $data); | |
}); | |
}, | |
// We position this event listener directly before the one responsible of invoking the query finder | |
QueryBus::PRIORITY_INVOKE_HANDLER + 1 | |
); | |
} | |
private function getCacheKey(Query $query): string | |
{ | |
$keyParts = [$query->messageName()]; | |
foreach ($query->payload() as $key => $value) { | |
$keyParts[] = $key; | |
$keyParts[] = $value; | |
} | |
return hash('sha512', json_encode($keyParts)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment