pythonpython-asyncioproducer-consumerpython-aiofiles

Asyncio Task was destroyed but it was pending


I am making requests to an external API, getting the response back and writing it to a file. Everything works/runs fine, however I receive the "Task was destroyed but it is pending!" warning that I'd like to clean up.

I have emulated the process below. I receive items from a source (e.g. a list) of items and as I receive them, I put them into the queue, signaling the end of the list but putting in a sentinel None value.

The write_q continues to receive items from this queue until it receives the sentinel and then breaks out.

Below is the code which will show that my write_task() is cancelled before it is completed. What is the proper design to handle this?

import asyncio
import aiofiles
import aiocsv
import json

async def task(l: list,  write_q: asyncio.Queue) -> None:

    # Read tasks from source of data
    for i in l:
        # Put a request task into the queue
        req: dict = {
            "headers": {"Accept": "application/json"},
            "url": "https://httpbin.org/post",
            "data": i
        }
        await write_q.put(req)

    # Sentinel value to signal we are done receiving from source
    await write_q.put(None)

async def write_task(write_q: asyncio.Queue) -> None:

    headers: bool = True
    while True:
        async with aiofiles.open("file.csv", mode="a+", newline='') as f:
            w = aiocsv.AsyncWriter(f)
            # Get data out of the queue to write it
            data = await write_q.get()
            if not data:
                write_q.task_done()
                await f.flush()
                break

            if headers:
                await w.writerow([
                    "status",
                    "data",
                ])
                headers = False

            # Write the data from the response
            await w.writerow([
                "200",
                json.dumps(data)
            ])
            write_q.task_done()

async def main() -> None:

    # Create fake data to POST
    items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5 

    # Queues for orchestrating 
    write_q = asyncio.Queue()
    producer = asyncio.create_task(
        task(items, write_q)
    )
    consumer = asyncio.create_task(
        write_task(write_q)
    )

    errors = await asyncio.gather(producer, return_exceptions=True)
    print(f"INFO: Producer has completed! exceptions: {errors}")

    # Wait for queue to empty and cancel the consumer
    await write_q.join()
    consumer.cancel()
    print("INFO: write consumer has completed! ")
    print("INFO: Complete!")

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

Solution

  • You should run

    await consumer
    

    instead of

    consumer.cancel()
    

    to wait for the end of task.


    import asyncio
    import aiofiles
    import aiocsv
    import json
    
    async def task(l: list,  write_q: asyncio.Queue) -> None:
    
        # Read tasks from source of data
        for i in l:
            # Put a request task into the queue
            req: dict = {
                "headers": {"Accept": "application/json"},
                "url": "https://httpbin.org/post",
                "data": i
            }
            await write_q.put(req)
    
        # Sentinel value to signal we are done receiving from source
        await write_q.put(None)
    
    async def write_task(write_q: asyncio.Queue) -> None:
    
        headers: bool = True
        while True:
            async with aiofiles.open("file.csv", mode="a+", newline='') as f:
                w = aiocsv.AsyncWriter(f)
                # Get data out of the queue to write it
                data = await write_q.get()
                if not data:
                    write_q.task_done()
                    await f.flush()
                    break
    
                if headers:
                    await w.writerow([
                        "status",
                        "data",
                    ])
                    headers = False
    
                # Write the data from the response
                await w.writerow([
                    "200",
                    json.dumps(data)
                ])
                write_q.task_done()
    
    async def main() -> None:
    
        # Create fake data to POST
        items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 5 
    
        # Queues for orchestrating 
        write_q = asyncio.Queue()
        producer = asyncio.create_task(
            task(items, write_q)
        )
        consumer = asyncio.create_task(
            write_task(write_q)
        )
    
        errors = await asyncio.gather(producer, return_exceptions=True)
        print(f"INFO: Producer has completed! exceptions: {errors}")
    
        # Wait for queue to empty and cancel the consumer
        await write_q.join()
        await consumer
        #consumer.cancel()    
        print("INFO: write consumer has completed! ")
        print("INFO: Complete!")
    
    if __name__ == "__main__":
        loop = asyncio.new_event_loop()
        loop.run_until_complete(main())