symfonyredissymfony-messenger

Symfony Messenger component and redis transport: messages are processed on all queues


I have 4 message queues that use Redis as transport and are each fired in their respective worker (via Supervisor), but every time a message lands in one of the queues, it gets processed in all of them. The documentation is clear in this regard, it explains that the environment variable MESSENGER_CONSUMER_NAME must be configured in the worker to differentiate which message will be processed in its respective queue.

The problem is that I can't find the solution. In the case of doctrine as a transport, it is simple because by configuring the name of the queue, it is possible to differentiate the messages.

#messenger.yaml:
framework:
    messenger:
        # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
        failure_transport: failed
        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            audit: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
            async_backup_task:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 0
            failed: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
            sync: 'sync://'
        routing:
            Symfony\Component\Mercure\Update: async
            App\Message\UsuarioTraceMessage: audit
            App\Message\BackupDirectoryToRemoteStorageMessage: async_backup_task

And this is the Supervisor configuration file:

[program:squidmgr-messenger-async]
    environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
    command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume async --time-limit=3600 --env=prod
    user=www-data
    numprocs=1
    startsecs=0
    autostart=true
    autorestart=true
    process_name=%(program_name)s_%(process_num)02d
    stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
    stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
    
    [program:squidmgr-messenger-audit]
    environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
    command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume audit --time-limit=3600 --env=prod
    user=www-data
    numprocs=1
    startsecs=0
    autostart=true
    autorestart=true
    process_name=%(program_name)s_%(process_num)02d
    stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
    stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
    
    [program:squidmgr-messenger-backup-task]
    environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
    command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume async_backup_task --time-limit=3600 --env=prod
    user=www-data
    numprocs=1
    startsecs=0
    autostart=true
    autorestart=true
    process_name=%(program_name)s_%(process_num)02d
    stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
    stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
[program:squidmgr-messenger-failed]
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume failed --time-limit=3600 --env=prod
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log

The environment variable that is set in the worker may need to be referenced in the DSN of each queue, but that's not clear to me.


Solution

  • You also need to set the consumer name in your messenger configuration:

    #messenger.yaml:
    framework:
        messenger:
            # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
            failure_transport: failed
            transports:
                # https://symfony.com/doc/current/messenger.html#transport-configuration
                async: 
                    dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                    options:
                      consumer: %env(MESSENGER_CONSUMER_NAME)%