I have the following producer pattern where I obtain data from a source, place it in queue, then consume it and write it to a CSV file. However, the program seems to end early because my tasks are all marked as done before they are actually wrote.
My hunch is because it is due to the fact I am yielding data from the queue so the queue sees that it is finished before the items that are being yielded get processed.
How can I ensure that my queue items are all consumed before ending the program? For example in the following program if ran, the final tag in my list is not finished writing before program exits.
import asyncio
import random
import aiocsv
import aiofiles
from pathlib import Path
# puts random data in a queue with a tag
async def produce(tag: str, q: asyncio.Queue) -> None:
data: float = random.random()
await q.put({
"tag": tag,
"data": data
})
async def read_items(q: asyncio.Queue) -> dict:
# generate from queue
while True:
item = await q.get()
print(f"retrieved item: {item}")
q.task_done()
yield item
# consumes from queue and writes to CSV
async def consume(q: asyncio.Queue, base_dir: Path) -> str:
async for item in read_items(q):
file: str = await write_csv(item, base_dir)
print(f"wrote to file: {file}")
async def write_csv(item: dict, base_dir: Path) -> str:
# file path to write data to for
file_path: str = base_dir.joinpath(str(item["tag"])+".csv")
print(f"writing to {file_path}")
async with aiofiles.open(file_path, mode="a+", newline='') as f:
w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
# write the core data
try:
await w.writerow([item["data"]])
except Exception as err:
print(err)
return file_path
# runs our end to end pattern
async def run() -> None:
q: asyncio.Queue = asyncio.Queue()
# finite list to simulate tags for production
tags: list[str] = [
"foo",
"bar",
"baz",
"foobington",
"barrington",
"bazzington"
]
# make directory we will write to if it does not yet exist
base_path: Path = Path("../data")
try:
base_path.mkdir(parents=True, exist_ok=True)
except Exception as err:
print(err)
# make producers
producers: list[asyncio.Task] = [
asyncio.create_task(produce(t, q))
for t in tags
]
# make a consumer to read data from queue and write to CSV
consumer = asyncio.create_task(consume(q, base_path))
# start producing
await asyncio.gather(*producers)
# wait for all tasks to be consumed from q
await q.join()
# cancel our consumer
consumer.cancel()
def main() -> None:
# boilerplate
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run())
if __name__ == "__main__":
main()
A useful pattern is to use a sentinel value to explicitly close the application. For example, a tag value of '' (empty string) indicates that there are no more items and the consumer can be closed.
Modify the tags list as follows:
tags: list[str] = [
"foo",
"bar",
"baz",
"foobington",
"barrington",
"bazzington",
''
]
Modify the consumer as follows:
async def read_items(q: asyncio.Queue) -> dict:
# generate from queue
while True:
item = await q.get()
print(f"retrieved item: {item}")
q.task_done()
if not item['tag']:
break # Quit when the sentinel is encountered
yield item
Then replace this line:
# cancel our consumer
consumer.cancel()
with:
await consumer
The program will not exit until the sentinel is processed and the consumer has shut down gracefully.