pythonqueuepython-asyncioproducer-consumer

Program using Asyncio with queue ends before consumer finishes


I have the following producer pattern where I obtain data from a source, place it in queue, then consume it and write it to a CSV file. However, the program seems to end early because my tasks are all marked as done before they are actually wrote.

My hunch is because it is due to the fact I am yielding data from the queue so the queue sees that it is finished before the items that are being yielded get processed.

How can I ensure that my queue items are all consumed before ending the program? For example in the following program if ran, the final tag in my list is not finished writing before program exits.

import asyncio
import random
import aiocsv
import aiofiles
from pathlib import Path

# puts random data in a queue with a tag
async def produce(tag: str, q: asyncio.Queue) -> None:
    data: float = random.random()
    await q.put({
        "tag": tag,
        "data": data
    })

async def read_items(q: asyncio.Queue) -> dict:
    # generate from queue
    while True:
        item = await q.get()
        print(f"retrieved item: {item}")
        q.task_done()
        yield item
        

# consumes from queue and writes to CSV
async def consume(q: asyncio.Queue, base_dir: Path) -> str:
    async for item in read_items(q):
        file: str = await write_csv(item, base_dir)
        print(f"wrote to file: {file}")

async def write_csv(item: dict, base_dir: Path) -> str:

    # file path to write data to for 
    file_path: str = base_dir.joinpath(str(item["tag"])+".csv")
    print(f"writing to {file_path}")
    async with aiofiles.open(file_path, mode="a+", newline='') as f:
        w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)

        # write the core data
        try:
            await w.writerow([item["data"]])
        except Exception as err:
            print(err)
    return file_path

# runs our end to end pattern
async def run() -> None:

    q: asyncio.Queue = asyncio.Queue()

    # finite list to simulate tags for production
    tags: list[str] = [
        "foo",
        "bar",
        "baz",
        "foobington",
        "barrington",
        "bazzington"
    ]

    # make directory we will write to if it does not yet exist
    base_path: Path = Path("../data")
    try:
        base_path.mkdir(parents=True, exist_ok=True)
    except Exception as err:
        print(err)

    # make producers
    producers: list[asyncio.Task] = [
        asyncio.create_task(produce(t, q))
        for t in tags
    ]

    # make a consumer to read data from queue and write to CSV
    consumer = asyncio.create_task(consume(q, base_path))

    # start producing
    await asyncio.gather(*producers)

    # wait for all tasks to be consumed from q
    await q.join()

    # cancel our consumer
    consumer.cancel()

def main() -> None:

    # boilerplate
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(run())

if __name__ == "__main__":
    main()

Solution

  • A useful pattern is to use a sentinel value to explicitly close the application. For example, a tag value of '' (empty string) indicates that there are no more items and the consumer can be closed.

    Modify the tags list as follows:

    tags: list[str] = [
            "foo",
            "bar",
            "baz",
            "foobington",
            "barrington",
            "bazzington",
            ''
        ]
    

    Modify the consumer as follows:

    async def read_items(q: asyncio.Queue) -> dict:
        # generate from queue
        while True:
            item = await q.get()
            print(f"retrieved item: {item}")
            q.task_done()
            if not item['tag']:
                break   # Quit when the sentinel is encountered
            yield item
    

    Then replace this line:

    # cancel our consumer
    consumer.cancel()
    

    with:

    await consumer
    

    The program will not exit until the sentinel is processed and the consumer has shut down gracefully.