Usecase is to federate all celery messages to a second rabbitmq node for backup (in case of disaster) using federated exchange.
I have set up federated exchange between 2 rabbitmq nodes. My django app (https://medium.com/@techWithAditya/building-scalable-applications-with-django-celery-and-rabbitmq-a-step-by-step-guide-fc58bccc8cad) connects to node 1 (which is upstream in this setup). Now when I publish a task, it gets queued on the upstream but does not get federated, but when I publish to celery exchange manually (via a standalone publisher) it gets federated.
Problem is only when the message is published via celery
Federation works fine when I publish directly in the celery exchange. It also works using standalone mq producer written in kombu
Upstream was set up using the following command
rabbitmqctl set_parameter federation-upstream origin '{"uri":"amqps://user:password@upstream-hostname:5671?cacertfile=<path-to-pem>"}' -p celery_vhost
Applied policy using below command
rabbitmqctl set_policy exchange-federation "^celery$" '{"federation-upstream-set":"all"}' --apply-to exchanges -p celery_vhost
Finally got the federation to work!
Although did not find the exact root cause, the issue seems to be around the exchange type which celery uses.
Changing exchange type from direct to topic worked for me.
For using custom queue added the below line in the celery settings in my django app
celery_app.conf.task_queues = (Queue('celery', Exchange('celery', type='topic'), routing_key='celery'),)