I have set up a RabbitMQ consumer as follows:
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.INFO,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueueConsumer(object):
"""The consumer class to manage connections to the AMQP server/queue"""
def __init__(self, queue, logger, parameters, thread_id=0):
self.channel = None
self.connection = None
self.queue_name = queue
self.logger = logger
self.consumer_id = 'Thread: %d' % (thread_id,)
self.parameters = pika.ConnectionParameters(**parameters)
def _on_queue_declared(self, frame):
self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
self.logger.info("{} Declared queue...".format(self.consumer_id))
except Exception as e:
self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))
def _on_channel_open(self, channel):
self.channel = channel
try:
self.channel.queue_declare(queue=self.queue_name,
exclusive=False,
durable=True,
auto_delete=False,
callback=self._on_queue_declared)
self.logger.info("{} Opened Channel....".format(self.consumer_id))
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
def _on_connected(self, connection):
connection.channel(self._on_channel_open)
def consume(self):
try:
self.connection = SelectConnection(self.parameters,
self._on_connected)
self.connection.ioloop.start()
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
self.connection.close()
self.connection.ioloop.start()
def decode(self, body):
try:
_body = body.decode('utf-8')
except AttributeError:
_body = body
return _body
def handle_delivery(self, channel, method, header, body):
try:
start_time = datetime.datetime.now()
_logger.info("Received...")
_logger.info("Content: %s" % body)
req = json.loads(self.decode(body))
# Do something
sleep(randint(10, 100))
time_taken = datetime.datetime.now() - start_time
_logger.info("[{}] Time Taken: {}.{}".format(
req.get("to_num"), time_taken.seconds, time_taken.microseconds))
except Exception as err:
_logger.exception(err)
if __name__ == "__main__":
workers = 3
pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
try:
pool = ThreadPoolExecutor(max_workers=workers)
start = 1
for thread_id in range(start, (workers + start)):
pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)
except Exception as err:
_logger.exception(err)
I, also, have a queue publisher as below:
import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.DEBUG,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueuePublisherClient(object):
def __init__(self, queue, request):
self.queue = queue
self.response = None
self.channel = None
self.request = request
self.corrId = str(uuid.uuid4())
self.callBackQueue = None
self.connection = None
parameters = pika.ConnectionParameters(host="0.0.0.0")
self.connection = SelectConnection(
parameters, self.on_response_connected
)
self.connection.ioloop.start()
def on_response(self, ch, method, props, body):
if self.corrId == props.correlation_id:
self.response = body
self.connection.close()
self.connection.ioloop.start()
def on_response_connected(self, connection):
_logger.info("Connected...\t(%s)" % self.queue)
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_connected(self, connection):
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
# _logger.info("Channel Opened...\t(%s)" % self.queue)
self.channel = channel
self.channel.queue_declare(queue=self.queue,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.channel.basic_publish(exchange="",
routing_key=self.queue,
properties=pika.BasicProperties(),
body=str(self.request))
self.connection.close()
_logger.info("Message Published...\t(%s)" % self.queue)
if __name__ == "__main__":
data = {
'text': 'This is a sample text',
'to_num': '+2547xxxxxxxx'
}
count = 10000
for index in range(count):
data['index'] = index
QueuePublisherClient("test_queue", json.dumps(data))
When I publish 10000 messages to the queue and the consumer is not started, via rabbitmqctl list_queues
I am able to see that test_queue has 10000 messages. When I start the consumer, I run rabbitmqctl list_queues
and I see that the queue has 0 messages. However, the consumer is still consuming the messages from the queue. The problem is, when I stop the consumer after a few seconds then restart it, I am unable to recover my messages. How can I evade this?
This is just a simulation of an actual situation whereby the consumer process is restarted by monit and I suffer the loss of messages.
You should be using the latest version of Pika, to start with.
When you set no_ack=True
(auto_ack=True
in Pika 1.0) RabbitMQ considers the message acknowledged when it is delivered. This means that every message that your consumer has in memory (or in the TCP stack) when you stop it will be lost because RabbitMQ considers it acknowledged.
You should use no_ack=False
(the default) and acknowledge messages in handle_delivery
once your work is done. Please note that if your work takes a long time you should do it in another thread to prevent blocking Pika's I/O loop.
See the following documentation: https://www.rabbitmq.com/confirms.html