I am implementing CloudEvents in my application using the Symfony Messenger component.
I have two queues, both of which should handle CloudEvents.
Here is my current configuration:
routing:
'CloudEvents\V1\CloudEvent':
- incoming
- outgoing
And when I want to add a CloudEvent to the queue, I want to specify exactly where in the queue the event should be added.
$this->messageBus->dispatch(
new Envelope(
$cloudEvent,
[
new TransportMessageIdStamp('outgoing')
]
)
);
or
$this->messageBus->dispatch(
$cloudEvent,
[
new TransportMessageIdStamp('outgoing')
]
);
I tried both, but the event still goes into the "outgoing" channel. How can I prevent this?
I solved the problem with a middleware, but there's one part of the solution I'm not happy with. If more stamps are added, the middleware will break the process. Perhaps someone has a more elegant way of handling this.
Here is my middleware class:
<?php
declare(strict_types=1);
namespace App\Messenger\Middleware;
use CloudEvents\V1\CloudEvent;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
readonly class CloudEventRoutingMiddleware implements MiddlewareInterface
{
public function __construct(
private SendersLocatorInterface $sendersLocator
) {
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$message = $envelope->getMessage();
$stamps = $envelope->all();
// We want to prevent the middleware from being used during event consumption.
// This approach isn't ideal; we should find a more elegant solution.
if (count($stamps) > 1) {
return $stack->next()->handle($envelope, $stack);
}
if ($message instanceof CloudEvent) {
$transportName = $this->determineTransportName($message);
$senders = $this->sendersLocator->getSenders($envelope);
foreach ($senders as $name => $sender) {
if ($name === $transportName) {
/** @var SenderInterface $sender */
$sender->send($envelope);
return $envelope;
}
}
}
return $stack->next()->handle($envelope, $stack);
}
private function determineTransportName(CloudEvent $event): string
{
return match ($event->getType()) {
'task.processing' => 'incoming',
'task.completion' => 'outgoing',
default => throw new \RuntimeException(
sprintf('No transport found for event type: %s', $event->getType())
),
};
}
}