symfonysymfony-messenger

How to check if I'm already in a message handler when dispatching a new message in Symfony Messenger?


I'm trying to log every message that goes through Symfony Messenger, along with every information I can gather:

I created a subscriber for all Messenger events I'm interested in:

final readonly class MessengerAuditSubscriber implements EventSubscriberInterface
{
    public static function getSubscribedEvents(): array
    {
        return [
            SendMessageToTransportsEvent::class => 'onSendMessageToTransportsEvent',
            WorkerMessageFailedEvent::class => 'onWorkerMessageFailedEvent',
            WorkerMessageHandledEvent::class => 'onWorkerMessageHandledEvent',
            WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
            WorkerMessageRetriedEvent::class => 'onWorkerMessageRetriedEvent',
        ];
    }

    // ...
}

I attach a custom stamp to the Envelope when the message is dispatched:

    public function onSendMessageToTransportsEvent(SendMessageToTransportsEvent $event): void
    {
        $envelope = $event->getEnvelope();
        $envelope = $envelope->with(new MessageIdStamp(uniqid()));
        $event->setEnvelope($envelope);

        // ...log the event
    }

And retrieve it in subsequent events:

    public function onWorkerMessageHandledEvent(WorkerMessageHandledEvent $event): void
    {
        $envelope = $event->getEnvelope();
        $messageIdStamp = $envelope->last(MessageIdStamp::class);

        // ...log the event
    }

So far, so good.

Now, when a message handler itself dispatches a new message, I'd like to log the parent message ID as well, so that I can later retrieve the whole hierarchy of messages that occurred starting from a root message ID:

How can I tell, from within my listener's onSendMessageToTransportsEvent() method, whether I'm already called from within another message handler?

Is there some mechanism in Messenger like the RequestStack, but for messages?


Solution

  • The solution involves keeping a stack of message ids in the subscriber.

    The stack is:

    final readonly class MessageIdStamp implements StampInterface
    {
        public function __construct(
            public string $messageId,
            public ?string $parentMessageId,
        ) {
        }
    }
    
    final class MessengerAuditSubscriber implements EventSubscriberInterface
    {
        /**
         * @var list<string>
         */
        public array $messageIdStack = [];
    
        public static function getSubscribedEvents(): array
        {
            return [
                SendMessageToTransportsEvent::class => 'onSendMessageToTransportsEvent',
                WorkerMessageReceivedEvent::class => 'onWorkerMessageReceivedEvent',
                WorkerMessageHandledEvent::class => 'onWorkerMessageHandledOrFailedEvent',
                WorkerMessageFailedEvent::class => 'onWorkerMessageHandledOrFailedEvent',
            ];
        }
    
        public function onSendMessageToTransportsEvent(SendMessageToTransportsEvent $event): void
        {
            $stackSize = count($this->messageIdStack);
            $parentMessageId = ($stackSize !== 0)
                ? $this->messageIdStack[$stackSize - 1]
                : null;
    
            $messageId = $this->generateMessageId();
            $messageIdStamp = new MessageIdStamp($messageId, $parentMessageId);
    
            $envelope = $event->getEnvelope()->with($messageIdStamp);
            $event->setEnvelope($envelope);
        }
    
        public function onWorkerMessageReceivedEvent(WorkerMessageReceivedEvent $event): void
        {
            $messageIdStamp = $event->getEnvelope()->last(MessageIdStamp::class);
    
            if ($messageIdStamp === null) {
                // redelivery of a legacy message that was not stamped, ignore
                return;
            }
    
            $this->messageIdStack[] = $messageIdStamp->messageId;
        }
    
        public function onWorkerMessageHandledOrFailedEvent(WorkerMessageHandledEvent|WorkerMessageFailedEvent $event): void
        {
            $messageIdStamp = $event->getEnvelope()->last(MessageIdStamp::class);
    
            if ($messageIdStamp === null) {
                // redelivery of a legacy message that was not stamped, ignore
                return;
            }
    
            array_pop($this->messageIdStack);
        }
    
        private function generateMessageId(): string
        {
            // ...
        }
    }