symfonysymfony-messengermessage-buscloudevents

Symfony Messenger with CloudEvents and multiple queues


I am implementing CloudEvents in my application using the Symfony Messenger component.

I have two queues, both of which should handle CloudEvents.

Here is my current configuration:

routing:
    'CloudEvents\V1\CloudEvent':
      - incoming
      - outgoing

And when I want to add a CloudEvent to the queue, I want to specify exactly where in the queue the event should be added.

$this->messageBus->dispatch(
    new Envelope(
        $cloudEvent,
        [
            new TransportMessageIdStamp('outgoing')
        ]
    )
);

or

$this->messageBus->dispatch(
        $cloudEvent,
        [
            new TransportMessageIdStamp('outgoing')
        ]
);

I tried both, but the event still goes into the "outgoing" channel. How can I prevent this?


Solution

  • I solved the problem with a middleware, but there's one part of the solution I'm not happy with. If more stamps are added, the middleware will break the process. Perhaps someone has a more elegant way of handling this.

    Here is my middleware class:

    <?php
    
    declare(strict_types=1);
    
    namespace App\Messenger\Middleware;
    
    use CloudEvents\V1\CloudEvent;
    use Symfony\Component\Messenger\Envelope;
    use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
    use Symfony\Component\Messenger\Middleware\StackInterface;
    use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
    use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
    
    readonly class CloudEventRoutingMiddleware implements MiddlewareInterface
    {
        public function __construct(
            private SendersLocatorInterface $sendersLocator
        ) {
        }
    
        public function handle(Envelope $envelope, StackInterface $stack): Envelope
        {
            $message = $envelope->getMessage();
            $stamps = $envelope->all();
    
            // We want to prevent the middleware from being used during event consumption.
            // This approach isn't ideal; we should find a more elegant solution.
            if (count($stamps) > 1) {
                return $stack->next()->handle($envelope, $stack);
            }
    
            if ($message instanceof CloudEvent) {
                $transportName = $this->determineTransportName($message);
                $senders = $this->sendersLocator->getSenders($envelope);
    
                foreach ($senders as $name => $sender) {
                    if ($name === $transportName) {
                        /** @var SenderInterface $sender */
                        $sender->send($envelope);
    
                        return $envelope;
                    }
                }
            }
    
            return $stack->next()->handle($envelope, $stack);
        }
    
        private function determineTransportName(CloudEvent $event): string
        {
            return match ($event->getType()) {
                'task.processing' => 'incoming',
                'task.completion' => 'outgoing',
                default => throw new \RuntimeException(
                    sprintf('No transport found for event type: %s', $event->getType())
                ),
            };
        }
    }