pythonasync-awaitqueuepython-asyncio

Communicating between Queues in Asyncio not Working as Expected


I'm working on a pattern where I communicate amongst multiple queues to process items along a pipeline. I am using sentinels to communicate between the queues when to stop working, however in the following code, I am seeing results that confuse me.

When reading from the write_q in write_task() I see the first value come in as the sentinel None instead of the tasks in the order they were placed in response_task(). If I understand right, write_task() should receive the items in order and process them in order as the tasks are created.

Also, when printing the qsize() in write_task() after I find the sentinel, it says there are 0 items, however when printing back in main it seems that the qsize() of write_q still has 2 items. I've read somewhere that aiofiles uses run_in_executor() which means there might be a divergence of where the queue is hand

Most of the below code is boilerplate to illustrate the actual scenario on why my code continues to block infinitely.

import asyncio
import aiofiles
import aiocsv
import json

async def fetch(t: float) -> dict:
    print(f"INFO: Sleeping for {t}s")
    await asyncio.sleep(t)
    return t

async def task(l: list,  request_q: asyncio.Queue) -> None:

    # Read tasks from source of data
    for i in l:
        await request_q.put(
            asyncio.create_task(fetch(i))
        )

    # Sentinel value to signal we are done receiving from source
    await request_q.put(None)

async def request_task(request_q: asyncio.Queue, write_q: asyncio.Queue) -> None:
    while True:
        req = await request_q.get()

        # If we received sentinel for tasks, pass message to next queue
        if not req:
            print("INFO: received sentinel from request_q")
            request_q.task_done()
            await request_q.put(None) # put back into the queue to signal to other consumers we are done
            break
        
        # Make the request
        resp = await req
        await write_q.put(resp)
        request_q.task_done()

async def write_task(write_q: asyncio.Queue) -> None:

    headers: bool = True
    async with aiofiles.open("file.csv", mode="w+", newline='') as f:
        w = aiocsv.AsyncWriter(f)
        while True:
            # Get data out of the queue to write it
            data = await write_q.get()
            print(data)

            # if not data:
            #     print(f"INFO: Found sentinel in write_task, queue size was: {write_q.qsize()}")
            #     write_q.task_done()
            #     await f.flush()
            #     break

            if headers:
                await w.writerow([
                    "status",
                    "data",
                ])
                headers = False

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(data)
            ])
            await f.flush()
            write_q.task_done()

async def main() -> None:

    # Create fake data to POST
    items: list[str] = [.2, .5, 1] 

    # Queues for orchestrating 
    request_q = asyncio.Queue()
    write_q = asyncio.Queue()

    # one producer
    producer = asyncio.create_task(
        task(items, request_q)
    )

    # 5 request consumers
    request_consumers = [
        asyncio.create_task(
            request_task(request_q, write_q)
        )
        for _ in range(2)
    ]

    # 5 write consumers
    write_consumer = asyncio.create_task(
        write_task(write_q)
    )

    errors = await asyncio.gather(producer, return_exceptions=True)
    print(f"INFO: Producer has completed! exceptions: {errors}")

    await request_q.join()
    for c in request_consumers:
        c.cancel()
    print("INFO: request consumer has completed! ")
    print(f"INFO: write_q in main qsize: {write_q.qsize()}")
    
    await write_q.join()
    print("INFO: write queue has completed! ")
    # await write_consumer
    write_consumer.cancel()
    print("INFO: Complete!")

if __name__ == "__main__":
    # loop = asyncio.new_event_loop()
    # loop.run_until_complete(main())
    asyncio.run(main())

Solution

  • My first comment is that your code is far more complicated than it needs to me. But the real problem is that you have N fetches you need to do (where N is the length of your items list) and M tasks (where M is currently 5) performing these N fetches concurrently but these tasks cannot be assumed to complete in an order corresponding to your items list.

    I believe the simplest solution is to pre-allocate a results list of length N and each fetch request is passed this list and an index indicating where the result is to go in that list. You cannot start writing your CSV file until all fetches have been completed if you want the file rows to correspond to the items input list. You only need one queue!

    For demo purposes I have renamed items to the more descriptive data_list and have initialized it as:

    data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
    

    I have also modified fetch to simulate fetching data and just returning req[data]. Consequently when the program completes the contents of file.csv should be:

    enter image description here

    I have also renamed some of your functions and variables to more descriptive ones:

    import asyncio
    import aiohttp
    import aiofiles
    import aiocsv
    import json
    
    N_REQUEST_TASKS = 5
    
    async def fetch(req: dict, results: list, idx: int) -> dict:
        # Make the request
    
        # For demo purposes:
        import random
    
        await asyncio.sleep(random.random())
        result = req['data']
        print(f"INFO returning {result} at index {idx}")
        results[idx] = result
        return
    
        async with aiohttp.ClientSession() as session:
            try:
                async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
                    result = await response.json()
                    response.raise_for_status()
                    print(f"INFO: response status was: {response.status}")
    
                    # Put response into queue to be written to file
            except Exception as err:
                print(f"ERROR: error making request: {err}")
                result = err
            finally:
                print(f"INFO returning {result} at index {idx}")
                results[idx] = result
    
    
    async def create_requests(data_list: list,  results: list, request_q: asyncio.Queue) -> None:
        # Read tasks from source of data
        for idx, data in enumerate(data_list):
            # Put a request task into the queue
            req: dict = {
                "headers": {"Accept": "application/json"},
                "url": "https://httpbin.org/post",
                "data": data
            }
            await request_q.put(
                (req, results, idx)
            )
    
        for _ in range(N_REQUEST_TASKS):
            # One sentinel for each request task:
            await request_q.put(None)
    
    
    async def request_task(request_q: asyncio.Queue) -> None:
        while True:
            # Retrieve necessary data to make request
            request = await request_q.get()
    
            # Sentinel?
            if not request:
                print("INFO: received sentinel from request_q")
                break
    
            # Make the request which will put data into the response queue
            # Unpack:
            req, results, idx = request
            print(f"INFO: request in request_task: {req['data']}")
            await fetch(req, results, idx)
    
    
    async def writer(results: list) -> None:
        async with aiofiles.open("file.csv", mode="w", newline='') as f:
            w = aiocsv.AsyncWriter(f)
    
            await w.writerow([
                "status",
                "data",
            ])
    
            for result in results:
                print(f"INFO: data in write_task: {result}")
                if isinstance(result, Exception):
                    continue
    
                # Write the data from the response
                await w.writerow([
                    "200",
                    json.dumps(result)
                ])
                await f.flush()
    
    async def main() -> None:
    
        # Create fake data to POST
        data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
    
        # Preallocate results list so that results will be in correct order
        results = [None] * len(data_list)
    
        # Request queue
        request_q = asyncio.Queue()
    
        tasks = []
    
        # one producer
        tasks.append(
            asyncio.create_task(
                create_requests(data_list, results, request_q)
            )
        )
    
        # N_REQUEST_TASKS consumers
        for _ in  range(N_REQUEST_TASKS):
            tasks.append(
                asyncio.create_task(
                    request_task(request_q)
                )
            )
    
        await asyncio.gather(*tasks)
        print(f"INFO: Results have been produced")
    
        await writer(results)
        print("INFO: writer has completed! ")
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Prints:

    INFO: request in request_task: ['hello0', 'world0']
    INFO: request in request_task: ['hello1', 'world1']
    INFO: request in request_task: ['hello2', 'world2']
    INFO: request in request_task: ['hello3', 'world3']
    INFO: request in request_task: ['hello4', 'world4']
    INFO returning ['hello1', 'world1'] at index 1
    INFO: request in request_task: ['hello5', 'world5']
    INFO returning ['hello0', 'world0'] at index 0
    INFO: request in request_task: ['hello6', 'world6']
    INFO returning ['hello6', 'world6'] at index 6
    INFO: request in request_task: ['hello7', 'world7']
    INFO returning ['hello2', 'world2'] at index 2
    INFO: request in request_task: ['hello8', 'world8']
    INFO returning ['hello3', 'world3'] at index 3
    INFO: request in request_task: ['hello9', 'world9']
    INFO returning ['hello4', 'world4'] at index 4
    INFO: received sentinel from request_q
    INFO returning ['hello9', 'world9'] at index 9
    INFO: received sentinel from request_q
    INFO returning ['hello5', 'world5'] at index 5
    INFO: received sentinel from request_q
    INFO returning ['hello7', 'world7'] at index 7
    INFO: received sentinel from request_q
    INFO returning ['hello8', 'world8'] at index 8
    INFO: received sentinel from request_q
    INFO: Results have been produced
    INFO: data in write_task: ['hello0', 'world0']
    INFO: data in write_task: ['hello1', 'world1']
    INFO: data in write_task: ['hello2', 'world2']
    INFO: data in write_task: ['hello3', 'world3']
    INFO: data in write_task: ['hello4', 'world4']
    INFO: data in write_task: ['hello5', 'world5']
    INFO: data in write_task: ['hello6', 'world6']
    INFO: data in write_task: ['hello7', 'world7']
    INFO: data in write_task: ['hello8', 'world8']
    INFO: data in write_task: ['hello9', 'world9']
    INFO: writer has completed!
    

    Update

    If you do not care about the completion order of fetch tasks and you want to append rows as soon as data has been fetched, then using two queues is the simplest approach as follows:

    import asyncio
    import aiohttp
    import aiofiles
    import aiocsv
    import json
    
    N_REQUEST_TASKS = 5
    
    async def fetch(req: dict) -> dict:
        # Make the request
    
        # For demo purposes:
        import random
    
        await asyncio.sleep(random.random())
        result = req['data']
        print(f"INFO returning {result}")
        return result
    
        async with aiohttp.ClientSession() as session:
            try:
                async with session.request("POST", url=req["url"], data=json.dumps(req["data"]), headers=req["headers"]) as response:
                    result = await response.json()
                    response.raise_for_status()
                    print(f"INFO: response status was: {response.status}")
    
                    # Put response into queue to be written to file
            except Exception as err:
                print(f"ERROR: error making request: {err}")
                result = err
            finally:
                print(f"INFO returning {result}")
                return result
    
    
    async def create_requests(data_list: list, request_q: asyncio.Queue) -> None:
        # Read tasks from source of data
        for data in data_list:
            # Put a request task into the queue
            req: dict = {
                "headers": {"Accept": "application/json"},
                "url": "https://httpbin.org/post",
                "data": data
            }
            await request_q.put(
                req
            )
    
        for _ in range(N_REQUEST_TASKS):
            # One sentinel for each request task:
            await request_q.put(None)
    
    
    async def request_task(request_q: asyncio.Queue, writer_q: asyncio.Queue) -> None:
        while True:
            # Retrieve necessary data to make request
            req = await request_q.get()
    
            # Sentinel?
            if not req:
                print("INFO: received sentinel from request_q")
                break
    
            # Make the request which will put data into the response queue
            print(f"INFO: request in request_task: {req['data']}")
            result = await fetch(req)
            await writer_q.put(result)
    
    
    async def writer(writer_q) -> None:
        async with aiofiles.open("file.csv", mode="w", newline='') as f:
            w = aiocsv.AsyncWriter(f)
    
            await w.writerow([
                "status",
                "data",
            ])
    
            while True:
                result = await writer_q.get()
                if result is None:
                    break
    
                print(f"INFO: data in write_task: {result}")
                if isinstance(result, Exception):
                    continue
    
                # Write the data from the response
                await w.writerow([
                    "200",
                    json.dumps(result)
                ])
                await f.flush()
    
    async def main() -> None:
    
        # Create fake data to POST
        data_list: list[str] = [[f"hello{i}", f"world{i}"] for i in range(10)]
    
        # Request queue
        request_q = asyncio.Queue()
        # Writer queue
        writer_q = asyncio.Queue()
    
        tasks = []
    
        # one producer
        tasks.append(
            asyncio.create_task(
                create_requests(data_list, request_q)
            )
        )
    
        # N_REQUEST_TASKS consumers
        for _ in  range(N_REQUEST_TASKS):
            tasks.append(
                asyncio.create_task(
                    request_task(request_q, writer_q)
                )
            )
    
        writer_task = asyncio.create_task(
            writer(writer_q)
        )
    
        await asyncio.gather(*tasks)
        print(f"INFO: Results have been produced")
    
        # Put sentinelto get writer to quit:
        await writer_q.put(None)
        await writer_task
        print("INFO: writer has completed! ")
    
    if __name__ == "__main__":
        asyncio.run(main())