phpsymfonysymfony-messenger

Symfony Messenger: is it possible to not throw the exception on last retry?


We're using Symfony Messenger, and have these transports:

framework:
    messenger:
        failure_transport: failed

        transports:
            failed:
                dsn: 'doctrine://default?queue_name=failed'
                options:
                    table_name: 'MessengerMessages'
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
                    delay: 5000
                    multiplier: 2
                    max_delay: 0
            asyncLowPriority:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%_low_priority'
                retry_strategy:
                    max_retries: 5
                    delay: 3600000
                    multiplier: 2
                    max_delay: 0
            sync: 'sync://'

When we send a message to the async queue, and the last retry fails with an exception, the exception is logged to the MessengerMessages table, and the exception bubbles up (goes to Sentry in our case). This is what we want.

When we send a message to the asyncLowPriority queue however, we would like failed messages to:

Basically, the exception should be dropped.

Is this possible, and how?

The reason is that we're using this queue for downloading images asynchronously, and we already log each failure in a dedicated database table in the command handler.


Solution

  • I managed to do this with a middleware:

    
    use Symfony\Component\Messenger\Envelope;
    use Symfony\Component\Messenger\Exception\HandlerFailedException;
    use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
    use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
    use Symfony\Component\Messenger\Middleware\StackInterface;
    use Symfony\Component\Messenger\Stamp\ReceivedStamp;
    use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
    use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
    use Throwable;
    
    final class BypassFailureTransportMiddleware implements MiddlewareInterface
    {
        public function __construct(
            private string $transportName,
            private int $maxRetries,
        ) {
        }
    
        public function handle(Envelope $envelope, StackInterface $stack): Envelope
        {
            try {
                return $stack->next()->handle($envelope, $stack);
            } catch (HandlerFailedException $exception) {
                $nestedException = $this->getNestedException($exception);
    
                if ($nestedException === null) {
                    throw $exception;
                }
    
                /** @var ReceivedStamp|null $receivedStamp */
                $receivedStamp = $envelope->last(ReceivedStamp::class);
    
                if ($receivedStamp === null || $receivedStamp->getTransportName() !== $this->transportName) {
                    throw $exception;
                }
    
                if (!$this->isLastRetry($envelope, $nestedException)) {
                    throw $exception;
                }
    
                return $envelope->with(new SentToFailureTransportStamp($receivedStamp->getTransportName()));
            }
        }
    
        private function getNestedException(HandlerFailedException $exception): ?Throwable
        {
            $nestedExceptions = $exception->getNestedExceptions();
    
            if (count($nestedExceptions) === 1) {
                return $nestedExceptions[0];
            }
    
            return null;
        }
    
        private function isLastRetry(Envelope $envelope, Throwable $nestedException): bool
        {
            if ($nestedException instanceof UnrecoverableMessageHandlingException) {
                return true;
            }
    
            /** @var RedeliveryStamp|null $redeliveryStamp */
            $redeliveryStamp = $envelope->last(RedeliveryStamp::class);
    
            if ($redeliveryStamp === null) {
                return false;
            }
    
            return $redeliveryStamp->getRetryCount() === $this->maxRetries;
        }
    }
    

    It must be configured with the name of the transport and the configured max_retries of this transport:

    parameters:
        async_allow_failure_transport_name: 'asyncAllowFailure'
        async_allow_failure_max_retries: 5
    
    services:
      command.bus.bypass_failure_transport_middleware:
        class: App\Infrastructure\CommandBus\Middleware\BypassFailureTransportMiddleware
        arguments:
          $transportName: '%async_allow_failure_transport_name%'
          $maxRetries: '%async_allow_failure_max_retries%'
    
    framework:
        messenger:
            transports:
                - name: '%async_allow_failure_transport_name%'
                  dsn: '...'
                  retry_strategy:
                      max_retries: '%async_allow_failure_max_retries%'
                      delay: 1000
                      multiplier: 2
                      max_delay: 0
    
            buses:
                command.bus:
                    middleware:
                      - 'command.bus.bypass_failure_transport_middleware'