I'm trying to log every message that goes through Symfony Messenger, along with every information I can gather:
<timestamp>
<timestamp>
I created a subscriber for all Messenger events I'm interested in:
final readonly class MessengerAuditSubscriber implements EventSubscriberInterface
{
public static function getSubscribedEvents(): array
{
return [
SendMessageToTransportsEvent::class => 'onSendMessageToTransportsEvent',
WorkerMessageFailedEvent::class => 'onWorkerMessageFailedEvent',
WorkerMessageHandledEvent::class => 'onWorkerMessageHandledEvent',
WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
WorkerMessageRetriedEvent::class => 'onWorkerMessageRetriedEvent',
];
}
// ...
}
I attach a custom stamp to the Envelope
when the message is dispatched:
public function onSendMessageToTransportsEvent(SendMessageToTransportsEvent $event): void
{
$envelope = $event->getEnvelope();
$envelope = $envelope->with(new MessageIdStamp(uniqid()));
$event->setEnvelope($envelope);
// ...log the event
}
And retrieve it in subsequent events:
public function onWorkerMessageHandledEvent(WorkerMessageHandledEvent $event): void
{
$envelope = $event->getEnvelope();
$messageIdStamp = $envelope->last(MessageIdStamp::class);
// ...log the event
}
So far, so good.
Now, when a message handler itself dispatches a new message, I'd like to log the parent message ID as well, so that I can later retrieve the whole hierarchy of messages that occurred starting from a root message ID:
1
(parent null
)
2
(parent 1
)
3
(parent 1
)
4
(parent 1
)
5
(parent 4
)
How can I tell, from within my listener's onSendMessageToTransportsEvent()
method, whether I'm already called from within another message handler?
Is there some mechanism in Messenger like the RequestStack
, but for messages?
The solution involves keeping a stack of message ids in the subscriber.
The stack is:
final readonly class MessageIdStamp implements StampInterface
{
public function __construct(
public string $messageId,
public ?string $parentMessageId,
) {
}
}
final class MessengerAuditSubscriber implements EventSubscriberInterface
{
/**
* @var list<string>
*/
public array $messageIdStack = [];
public static function getSubscribedEvents(): array
{
return [
SendMessageToTransportsEvent::class => 'onSendMessageToTransportsEvent',
WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
WorkerMessageHandledEvent::class => 'onWorkerMessageHandledOrFailedEvent',
WorkerMessageFailedEvent::class => 'onWorkerMessageHandledOrFailedEvent',
];
}
public function onSendMessageToTransportsEvent(SendMessageToTransportsEvent $event): void
{
$stackSize = count($this->messageIdStack);
$parentMessageId = ($stackSize !== 0)
? $this->messageIdStack[$stackSize - 1]
: null;
$messageId = $this->generateMessageId();
$messageIdStamp = new MessageIdStamp($messageId, $parentMessageId);
$envelope = $event->getEnvelope()->with($messageIdStamp);
$event->setEnvelope($envelope);
}
public function onWorkerMessageReceivedEvent(WorkerMessageReceivedEvent $event): void
{
$messageIdStamp = $event->getEnvelope()->last(MessageIdStamp::class);
if ($messageIdStamp === null) {
// redelivery of a legacy message that was not stamped, ignore
return;
}
$this->messageIdStack[] = $messageIdStamp->messageId;
}
public function onWorkerMessageHandledOrFailedEvent(WorkerMessageHandledEvent|WorkerMessageFailedEvent $event): void
{
$messageIdStamp = $event->getEnvelope()->last(MessageIdStamp::class);
if ($messageIdStamp === null) {
// redelivery of a legacy message that was not stamped, ignore
return;
}
array_pop($this->messageIdStack);
}
private function generateMessageId(): string
{
// ...
}
}