I have 4 message queues that use Redis as transport and are each fired in their respective worker (via Supervisor), but every time a message lands in one of the queues, it gets processed in all of them. The documentation is clear in this regard, it explains that the environment variable MESSENGER_CONSUMER_NAME must be configured in the worker to differentiate which message will be processed in its respective queue.
The problem is that I can't find the solution. In the case of doctrine as a transport, it is simple because by configuring the name of the queue, it is possible to differentiate the messages.
#messenger.yaml:
framework:
messenger:
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
failure_transport: failed
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
audit:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 3
async_backup_task:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
retry_strategy:
max_retries: 0
failed:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
sync: 'sync://'
routing:
Symfony\Component\Mercure\Update: async
App\Message\UsuarioTraceMessage: audit
App\Message\BackupDirectoryToRemoteStorageMessage: async_backup_task
And this is the Supervisor configuration file:
[program:squidmgr-messenger-async]
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume async --time-limit=3600 --env=prod
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
[program:squidmgr-messenger-audit]
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume audit --time-limit=3600 --env=prod
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
[program:squidmgr-messenger-backup-task]
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume async_backup_task --time-limit=3600 --env=prod
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
[program:squidmgr-messenger-failed]
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
command=php -d memory_limit=256M /var/www/html/squidmgr/bin/console messenger:consume failed --time-limit=3600 --env=prod
user=www-data
numprocs=1
startsecs=0
autostart=true
autorestart=true
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/var/log/supervisor/squidmgr_messenger_out.log
stderr_logfile=/var/log/supervisor/squidmgr_messenger_err.log
The environment variable that is set in the worker may need to be referenced in the DSN of each queue, but that's not clear to me.
You also need to set the consumer name in your messenger configuration:
#messenger.yaml:
framework:
messenger:
# Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
failure_transport: failed
transports:
# https://symfony.com/doc/current/messenger.html#transport-configuration
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
consumer: %env(MESSENGER_CONSUMER_NAME)%