I have a task queue in RabbitMQ with multiple producers (12) and one consumer for heavy tasks in a webapp. When I run the consumer it starts dequeuing some of the messages before crashing with this error:
Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")
The producers code is:
message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
connection.close()
And the only consumer's code (the one is clashing):
def callback(self, ch, method, properties, body):
"""Callback when receive a message."""
message = json.loads(body)
try:
image = _get_image(message['image_url'])
except:
sys.stderr.write('Error getting image in note %s' % note['id'])
# Crop image with PIL. Not so expensive
box_path = _crop(image, message['image_name'], box)
# API call. Long time function
result = long_api_call(box_path)
if result is None:
sys.stderr.write('Error in note %s' % note['id'])
return
# update the db
db.update_record(result)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_obj.callback, queue='tasks_queue', no_ack=True)
channel.start_consuming()
As you can see, there are 3 expensive functions for message. One crop task, one API call and one database update. Without the API call, que consumer runs smoothly.
Thanks in advance
Your RabbitMQ log shows a message that I thought we might see:
missed heartbeats from client, timeout: 60s
What's happening is that your long_api_call
blocks Pika's I/O loop. Pika is a very lightweight library and does not start threads in the background for you so you must code in such a way as to not block Pika's I/O loop longer than the heartbeat interval. RabbitMQ thinks your client has died or is unresponsive and forcibly closes the connection.
Please see my answer here which links to this example code showing how to properly execute a long-running task in a separate thread. You can still use no_ack=True
, you will just skip the ack_message
call.