I have a RabbitMQ consumer. I would like to have that consumer do some message processing, simulated by time.sleep(10)
, then publish a message to a different queue. I know the consumer callback has a channel that in theory could be used to do the publish, but this seems like a bad implementation because if the basic_publish()
somehow manages for force close the channel, then the consumer dies. What is the best way to handle this?
import time
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='original_queue', exclusive=True)
channel.queue_bind(exchange='logs', queue='original_queue')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
time.sleep(10)
ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')
channel.basic_consume(
queue='original_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
You can implement your consumer in a way that it automatically reconnects to the RabbitMQ server if the connection gets closed. Hope this helps(I didn't put much thought on the design part, feel free to suggest some!)
import time
import pika
reconnect_on_failure = True
def consumer(connection, channel):
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='original_queue', exclusive=True)
channel.queue_bind(exchange='logs', queue='original_queue')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
time.sleep(10)
ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')
channel.basic_consume(
queue='original_queue', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
def get_connection_and_channel():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
return connection, channel
def start(reconnect_on_failure):
connection, channel = get_connection_and_channel()
consumer(connection, channel)
# the if condition will be executed when the consumer's start_consuming loop exists
if reconnect_on_failure:
# cleanly close the connection and channel
if not connection.is_closed():
connection.close()
if not channel.is_close():
channel.close()
start(reconnect_on_failure)
start(reconnect_on_failure)