Last active
July 3, 2024 10:28
-
-
Save kriswallsmith/6514ca5a30db352ad0aea328607546b4 to your computer and use it in GitHub Desktop.
Symfony Messenger message chaining
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain; | |
use App\Messenger\Chain\Stamp\ChainStamp; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Stamp\StampInterface; | |
final readonly class Chain | |
{ | |
/** | |
* @param list<object|Envelope|list<object|Envelope>> $chain A list of messages or envelopes | |
* @param StampInterface[] $stamps Stamps to add to each envelope | |
*/ | |
public static function make(array $chain, array $stamps = []): Envelope | |
{ | |
if (!$chain) { | |
throw new \InvalidArgumentException('Message chain must have at least one item.'); | |
} | |
if (!is_array($leaves = array_pop($chain))) { | |
$chain[] = $leaves; | |
$leaves = []; | |
} | |
return Envelope::wrap(array_shift($chain), $stamps)->with(...ChainStamp::fromChain( | |
chain: $chain, | |
leaves: $leaves, | |
stamps: $stamps, | |
)); | |
} | |
private function __construct() {} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain\Event; | |
use Symfony\Component\Messenger\Envelope; | |
final readonly class ChainedMessageFailedEvent | |
{ | |
public function __construct( | |
public Envelope $envelope, | |
public Envelope $failedEnvelope, | |
public \Throwable $throwable, | |
) {} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain; | |
use App\Messenger\Chain\Event\ChainedMessageFailedEvent; | |
use App\Messenger\Chain\Stamp\ChainParentStamp; | |
use App\Messenger\Chain\Stamp\ChainStamp; | |
use Psr\EventDispatcher\EventDispatcherInterface; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\MessageBusInterface; | |
use Symfony\Component\Messenger\Middleware\MiddlewareInterface; | |
use Symfony\Component\Messenger\Middleware\StackInterface; | |
use Symfony\Component\Messenger\Stamp\HandledStamp; | |
use Symfony\Component\Messenger\Stamp\SerializedMessageStamp; | |
use Symfony\Component\Messenger\Stamp\StampInterface; | |
final readonly class ChainMiddleware implements MiddlewareInterface | |
{ | |
public function __construct( | |
private MessageBusInterface $bus, | |
private ?EventDispatcherInterface $dispatcher = null, | |
) {} | |
public function handle(Envelope $envelope, StackInterface $stack): Envelope | |
{ | |
try { | |
$envelope = $stack->next()->handle($envelope, $stack); | |
} catch (\Throwable $e) { | |
if ($parentStamp = $envelope->last(ChainParentStamp::class)) { | |
$this->bubbleError($parentStamp, $envelope, $e); | |
} | |
throw $e; | |
} | |
if (!$handledStamp = $envelope->last(HandledStamp::class)) { | |
// for example if the send message middleware intercepted this message | |
return $envelope; | |
} | |
/** @var ChainStamp $chainStamp */ | |
foreach ($envelope->all(ChainStamp::class) as $chainStamp) { | |
if (!$nextEnvelope = $this->nextEnvelope($envelope->getMessage(), $handledStamp, $chainStamp)) { | |
continue; | |
} | |
// dispatch the chained message | |
$this->bus->dispatch($nextEnvelope->with(new ChainParentStamp($envelope))); | |
} | |
// remove chain stamps | |
return $envelope->withoutAll(ChainStamp::class); | |
} | |
/** | |
* Dispatches a failure event for each ancestor in the chain. | |
*/ | |
private function bubbleError(ChainParentStamp $parentStamp, Envelope $envelope, \Throwable $e): void | |
{ | |
do { | |
$this->dispatcher?->dispatch(new ChainedMessageFailedEvent( | |
envelope: $parentStamp->parentEnvelope, | |
failedEnvelope: $envelope, | |
throwable: $e, | |
)); | |
} while ($parentStamp = $parentStamp->parentEnvelope->last(ChainParentStamp::class)); | |
} | |
/** | |
* Returns the envelope to dispatch after the handled message. | |
*/ | |
private function nextEnvelope(object $handledMessage, HandledStamp $handledStamp, ChainStamp $chainStamp): ?Envelope | |
{ | |
$envelope = $chainStamp->envelope; | |
$message = $envelope->getMessage(); | |
$stamps = self::stamps($envelope); | |
if (!$message instanceof MessageChainInterface) { | |
return $envelope; | |
} | |
if (!$message = $message->next($handledMessage, $handledStamp->getResult())) { | |
// skip this stamp | |
return null; | |
} | |
if ($message instanceof Envelope) { | |
// *append* stamps from chained envelope and open the message | |
$stamps = [ | |
...$stamps, | |
...self::stamps($message), | |
]; | |
$message = $message->getMessage(); | |
} | |
// force the serializer to re-serialize this message | |
return (new Envelope($message, $stamps))->withoutAll(SerializedMessageStamp::class); | |
} | |
/** @return StampInterface[] */ | |
private static function stamps(Envelope $envelope): array | |
{ | |
return $envelope->all() ? array_merge(...array_values($envelope->all())) : []; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Tests\Messenger\Chain; | |
use App\Messenger\Chain\ChainMiddleware; | |
use App\Messenger\Chain\Event\ChainedMessageFailedEvent; | |
use App\Messenger\Chain\MessageChainInterface; | |
use App\Messenger\Chain\Stamp\ChainParentStamp; | |
use App\Messenger\Chain\Stamp\ChainStamp; | |
use PHPUnit\Framework\TestCase; | |
use Psr\EventDispatcher\EventDispatcherInterface; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\MessageBusInterface; | |
use Symfony\Component\Messenger\Middleware\MiddlewareInterface; | |
use Symfony\Component\Messenger\Middleware\StackInterface; | |
use Symfony\Component\Messenger\Stamp\BusNameStamp; | |
use Symfony\Component\Messenger\Stamp\HandledStamp; | |
final class ChainMiddlewareTest extends TestCase | |
{ | |
public function testChainedMessage(): void | |
{ | |
$message = (object) []; | |
$nextMessage = (object) []; | |
$envelope = new Envelope($message, [ | |
new HandledStamp(null, 'handler'), | |
ChainStamp::wrap($nextMessage), | |
]); | |
$bus = $this->createMock(MessageBusInterface::class); | |
$bus->expects(self::once()) | |
->method('dispatch') | |
->with(self::callback(function(Envelope $nextEnvelope) use ($nextMessage): bool { | |
self::assertSame($nextMessage, $nextEnvelope->getMessage()); | |
return true; | |
})) | |
->willReturn(new Envelope($nextMessage)); | |
$next = $this->createStub(MiddlewareInterface::class); | |
$next->method('handle')->willReturn($envelope); | |
$stack = $this->createStub(StackInterface::class); | |
$stack->method('next')->willReturn($next); | |
$middleware = new ChainMiddleware($bus); | |
$middleware->handle($envelope, $stack); | |
} | |
public function testChainedMessageChain(): void | |
{ | |
$message = (object) []; | |
$nextMessage = new class() implements MessageChainInterface { | |
public ?object $message = null; | |
public mixed $result = null; | |
public function next(object $handledMessage, mixed $handledResult): object | |
{ | |
$this->message = $handledMessage; | |
$this->result = $handledResult; | |
return $this; | |
} | |
}; | |
$envelope = new Envelope($message, [ | |
new HandledStamp('result', 'handler'), | |
ChainStamp::wrap($nextMessage), | |
]); | |
$bus = $this->createMock(MessageBusInterface::class); | |
$bus->expects(self::once()) | |
->method('dispatch') | |
->willReturn(new Envelope($nextMessage)); | |
$next = $this->createStub(MiddlewareInterface::class); | |
$next->method('handle')->willReturn($envelope); | |
$stack = $this->createStub(StackInterface::class); | |
$stack->method('next')->willReturn($next); | |
$middleware = new ChainMiddleware($bus); | |
$middleware->handle($envelope, $stack); | |
self::assertSame($message, $nextMessage->message); | |
self::assertSame('result', $nextMessage->result); | |
} | |
public function testChainedMessageChainEnvelopeReturn(): void | |
{ | |
$message = (object) []; | |
$nextMessage = new class() implements MessageChainInterface { | |
public function next(object $handledMessage, mixed $handledResult): object | |
{ | |
return new Envelope((object) ['result' => $handledResult], [new BusNameStamp('bus2')]); | |
} | |
}; | |
$envelope = new Envelope($message, [ | |
new HandledStamp('result', 'handler'), | |
ChainStamp::wrap($nextMessage, [new BusNameStamp('bus1')]), | |
]); | |
$bus = $this->createMock(MessageBusInterface::class); | |
$bus->expects(self::once()) | |
->method('dispatch') | |
->with(self::callback(function(Envelope $nextEnvelope): bool { | |
self::assertEquals((object) ['result' => 'result'], $nextEnvelope->getMessage()); | |
self::assertEquals([ | |
new BusNameStamp('bus1'), | |
new BusNameStamp('bus2'), | |
], $nextEnvelope->all(BusNameStamp::class)); | |
return true; | |
})) | |
->willReturn(new Envelope($nextMessage)); | |
$next = $this->createStub(MiddlewareInterface::class); | |
$next->method('handle')->willReturn($envelope); | |
$stack = $this->createStub(StackInterface::class); | |
$stack->method('next')->willReturn($next); | |
$middleware = new ChainMiddleware($bus); | |
$middleware->handle($envelope, $stack); | |
} | |
public function testChainableStamps(): void | |
{ | |
$message = (object) []; | |
$nextMessage = (object) []; | |
$nextStamp = new BusNameStamp('bus'); | |
$envelope = new Envelope($message, [ | |
new HandledStamp('result', 'handler'), | |
ChainStamp::wrap($nextMessage, [$nextStamp]), | |
]); | |
$bus = $this->createMock(MessageBusInterface::class); | |
$bus->expects(self::once()) | |
->method('dispatch') | |
->with( | |
self::callback(function(Envelope $nextEnvelope) use ($nextMessage, $nextStamp): bool { | |
self::assertSame($nextMessage, $nextEnvelope->getMessage()); | |
self::assertSame($nextStamp, $nextEnvelope->last(BusNameStamp::class)); | |
return true; | |
}), | |
) | |
->willReturn(new Envelope($nextMessage)); | |
$next = $this->createStub(MiddlewareInterface::class); | |
$next->method('handle')->willReturn($envelope); | |
$stack = $this->createStub(StackInterface::class); | |
$stack->method('next')->willReturn($next); | |
$middleware = new ChainMiddleware($bus); | |
$middleware->handle($envelope, $stack); | |
} | |
public function testErrorBubbling(): void | |
{ | |
$exception = new \LogicException('Test exception'); | |
$this->expectExceptionObject($exception); | |
$parentEnvelope = new Envelope((object) []); | |
$envelope = new Envelope((object) [], [new ChainParentStamp($parentEnvelope)]); | |
$stack = $this->createStub(StackInterface::class); | |
$nextMiddleware = $this->createStub(MiddlewareInterface::class); | |
$stack->method('next')->willReturn($nextMiddleware); | |
$nextMiddleware->method('handle')->willThrowException($exception); | |
$bus = $this->createStub(MessageBusInterface::class); | |
$dispatcher = $this->createMock(EventDispatcherInterface::class); | |
$dispatcher->expects(self::once()) | |
->method('dispatch') | |
->with(self::isInstanceOf(ChainedMessageFailedEvent::class)); | |
$middleware = new ChainMiddleware($bus, $dispatcher); | |
$middleware->handle($envelope, $stack); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain\Stamp; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Stamp\StampInterface; | |
/** | |
* Stamp applied to chained messages to maintain the link to their parent. | |
*/ | |
final readonly class ChainParentStamp implements StampInterface | |
{ | |
public function __construct( | |
/** The prior message in the chain. */ | |
public Envelope $parentEnvelope, | |
) {} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain\Serializer; | |
use App\Messenger\Chain\Stamp\ChainParentStamp; | |
use Psr\Container\ContainerInterface; | |
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | |
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface; | |
use Symfony\Component\Serializer\Normalizer\NormalizerInterface; | |
use Symfony\Contracts\Service\ServiceSubscriberInterface; | |
final readonly class ChainParentStampNormalizer implements NormalizerInterface, DenormalizerInterface, ServiceSubscriberInterface | |
{ | |
public static function getSubscribedServices(): array | |
{ | |
return [ | |
SerializerInterface::class, | |
]; | |
} | |
public function __construct( | |
private ContainerInterface $container, | |
) {} | |
/** | |
* @param array{headers: array<string, mixed>, body: array<string, mixed>} $data | |
* @param array<string, mixed> $context | |
*/ | |
public function denormalize(mixed $data, string $type, string $format = null, array $context = []): ChainParentStamp | |
{ | |
$serializer = $this->container->get(SerializerInterface::class); | |
return new ChainParentStamp($serializer->decode([ | |
'headers' => $data['headers'], | |
'body' => json_encode($data['body']), | |
])); | |
} | |
/** @param array<string, mixed> $context */ | |
public function supportsDenormalization(mixed $data, string $type, string $format = null, array $context = []): bool | |
{ | |
return ChainParentStamp::class === $type; | |
} | |
/** | |
* @param ChainParentStamp $object | |
* @param array<string, mixed> $context | |
* @return array{headers: array<string, mixed>, body: array<string, mixed>} | |
*/ | |
public function normalize(mixed $object, string $format = null, array $context = []): array | |
{ | |
$serializer = $this->container->get(SerializerInterface::class); | |
[ | |
'headers' => $headers, | |
'body' => $encodedBody, | |
] = $serializer->encode($object->parentEnvelope); | |
return [ | |
'headers' => $headers, | |
'body' => json_decode($encodedBody, associative: true), | |
]; | |
} | |
/** @param array<string, mixed> $context */ | |
public function supportsNormalization(mixed $data, string $format = null, array $context = []): bool | |
{ | |
return $data instanceof ChainParentStamp; | |
} | |
/** @return array<string, bool> */ | |
public function getSupportedTypes(?string $format): array | |
{ | |
return [ | |
ChainParentStamp::class => true, | |
]; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain\Stamp; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Stamp\StampInterface; | |
/** | |
* Chains a message to be dispatched after the parent message is handled. | |
* | |
* If the message implements {@link MessageChainInterface} the result of | |
* parent message will be passed to the message. | |
*/ | |
final readonly class ChainStamp implements StampInterface | |
{ | |
/** | |
* Turns the supplied message chain into a list of stamps. | |
* | |
* @param object[] $chain A list of messages or envelopes to dispatch sequentially | |
* @param object[] $leaves A list of messages or envelopes to dispatch in parallel, after the chain | |
* @param StampInterface[] $stamps Stamps to add to each envelope | |
* | |
* @return ChainStamp[] | |
*/ | |
public static function fromChain(array $chain, array $leaves, array $stamps = []): array | |
{ | |
$chainStamps = array_map(fn(object $leaf) => self::wrap($leaf, $stamps), $leaves); | |
while ($chain) { | |
$chainStamps = [ | |
self::wrap(array_pop($chain), [ | |
...$stamps, | |
...$chainStamps, | |
]), | |
]; | |
} | |
return $chainStamps; | |
} | |
/** @param StampInterface[] $stamps */ | |
public static function wrap(object $message, array $stamps = []): self | |
{ | |
return new self(Envelope::wrap($message, $stamps)); | |
} | |
public function __construct( | |
public Envelope $envelope, | |
) {} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain\Serializer; | |
use App\Messenger\Chain\Stamp\ChainStamp; | |
use Psr\Container\ContainerInterface; | |
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; | |
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface; | |
use Symfony\Component\Serializer\Normalizer\NormalizerInterface; | |
use Symfony\Contracts\Service\ServiceSubscriberInterface; | |
final readonly class ChainStampNormalizer implements NormalizerInterface, DenormalizerInterface, ServiceSubscriberInterface | |
{ | |
public static function getSubscribedServices(): array | |
{ | |
return [ | |
SerializerInterface::class, | |
]; | |
} | |
public function __construct( | |
private ContainerInterface $container, | |
) {} | |
/** | |
* @param array{headers: array<string, mixed>, body: array<string, mixed>} $data | |
* @param array<string, mixed> $context | |
*/ | |
public function denormalize(mixed $data, string $type, string $format = null, array $context = []): ChainStamp | |
{ | |
$serializer = $this->container->get(SerializerInterface::class); | |
return new ChainStamp($serializer->decode([ | |
'headers' => $data['headers'], | |
'body' => json_encode($data['body']), | |
])); | |
} | |
/** @param array<string, mixed> $context */ | |
public function supportsDenormalization(mixed $data, string $type, string $format = null, array $context = []): bool | |
{ | |
return ChainStamp::class === $type; | |
} | |
/** | |
* @param ChainStamp $object | |
* @param array<string, mixed> $context | |
* @return array{headers: array<string, mixed>, body: array<string, mixed>} | |
*/ | |
public function normalize(mixed $object, string $format = null, array $context = []): array | |
{ | |
$serializer = $this->container->get(SerializerInterface::class); | |
[ | |
'headers' => $headers, | |
'body' => $encodedBody, | |
] = $serializer->encode($object->envelope); | |
return [ | |
'headers' => $headers, | |
'body' => json_decode($encodedBody, associative: true), | |
]; | |
} | |
/** @param array<string, mixed> $context */ | |
public function supportsNormalization(mixed $data, string $format = null, array $context = []): bool | |
{ | |
return $data instanceof ChainStamp; | |
} | |
/** @return array<string, bool> */ | |
public function getSupportedTypes(?string $format): array | |
{ | |
return [ | |
ChainStamp::class => true, | |
]; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Tests\Messenger\Chain; | |
use App\Messenger\Chain\Chain; | |
use App\Messenger\Chain\Stamp\ChainStamp; | |
use PHPUnit\Framework\TestCase; | |
use Symfony\Component\Messenger\Envelope; | |
use Symfony\Component\Messenger\Stamp\BusNameStamp; | |
final class ChainTest extends TestCase | |
{ | |
public function testMake(): void | |
{ | |
$message1 = (object) ['id' => '1']; | |
$message2 = (object) ['id' => '2']; | |
$message3a = (object) ['id' => '3a']; | |
$message3b = (object) ['id' => '3b']; | |
$envelope = Chain::make([ | |
$message1, | |
new Envelope($message2, [new BusNameStamp('bus2')]), | |
[$message3a, $message3b], | |
], [ | |
new BusNameStamp('bus1'), | |
]); | |
/** @var ChainStamp[] $chainStamps */ | |
$chainStamps = $envelope->all(ChainStamp::class); | |
self::assertCount(1, $chainStamps); | |
self::assertSame($message2, $chainStamps[0]->envelope->getMessage()); | |
// bus1, bus2 | |
self::assertCount(2, $chainStamps[0]->envelope->all(BusNameStamp::class)); | |
// message3a, message3b | |
self::assertCount(2, $chainStamps[0]->envelope->all(ChainStamp::class)); | |
} | |
public function testMakeOneItem(): void | |
{ | |
$message = (object) []; | |
$envelope = Chain::make([$message]); | |
self::assertSame($message, $envelope->getMessage()); | |
self::assertEmpty($envelope->all(ChainStamp::class)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?php | |
declare(strict_types=1); | |
namespace App\Messenger\Chain; | |
use Symfony\Component\Messenger\Envelope; | |
interface MessageChainInterface | |
{ | |
/** | |
* Receives the parent message and result. | |
* | |
* @param object $handledMessage The parent message | |
* @param mixed $handledResult The result of the parent message | |
* | |
* @return object|Envelope|null The chained message or envelope to run next | |
*/ | |
public function next(object $handledMessage, mixed $handledResult): ?object; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment