rabbitmqpikaaio

aio_pika. Messages was random lost from queue


I have asyncronus generator for messeging consuming.

import asyncio
queue = asyncio.Queue()


async def consume_gen(
            self,
            consume_from,
            prefetch_count,
            priority=None
    ):
        async with self.channel_pool.acquire() as channel:
            await channel.set_qos(prefetch_count=5)
                self.amqp_queue = await channel.declare_queue(
                    'queue_name_for_consuming',
                     durable=True, 
                     auto_delete=False
                )
                await self.amqp_queue.consume(
                    self.get_message, no_ack=False
                )
                await asyncio.sleep(0)
    
            while True:
                try:
                    message = self.queue.get_nowait()
                    yield message
                except asyncio.queues.QueueEmpty:
                    await asyncio.sleep(1)
                    yield None
                except GeneratorExit:
                    return
            else:
                return

This is callback function for my generator which response for getting message for queue and put it to intenal asyncio queue.

async def get_message(self, message):
    await self.queue.put(message)

My problem is that some messages randomly disappear from the queue. They not ack, or stuck in any consumer, becouse I have logs for all steps. Also, i know, what this messages come to queue, from which the should be consumed. Will be gratful for any help in resolve my problem


Solution

  • Resolve it. Probles was more easy. My logger app has not enought log level)