I have asyncronus generator for messeging consuming.
import asyncio
queue = asyncio.Queue()
async def consume_gen(
self,
consume_from,
prefetch_count,
priority=None
):
async with self.channel_pool.acquire() as channel:
await channel.set_qos(prefetch_count=5)
self.amqp_queue = await channel.declare_queue(
'queue_name_for_consuming',
durable=True,
auto_delete=False
)
await self.amqp_queue.consume(
self.get_message, no_ack=False
)
await asyncio.sleep(0)
while True:
try:
message = self.queue.get_nowait()
yield message
except asyncio.queues.QueueEmpty:
await asyncio.sleep(1)
yield None
except GeneratorExit:
return
else:
return
This is callback function for my generator which response for getting message for queue and put it to intenal asyncio queue.
async def get_message(self, message):
await self.queue.put(message)
My problem is that some messages randomly disappear from the queue. They not ack, or stuck in any consumer, becouse I have logs for all steps. Also, i know, what this messages come to queue, from which the should be consumed. Will be gratful for any help in resolve my problem
Resolve it. Probles was more easy. My logger app has not enought log level)