I have a large number of asyncio tasks that are consuming data via a queue and writing to to separate files. However, some of the files will be written to multiple times via mode a+
. I have written some code to simulate some random processing in a similar way to my real world example.
I am using asyncio.Lock()
in the following fashion to protect the file from whatever task takes ownership of writing to it, but am still receiving CSV results that are misaligned and/or corrupted. Also, the header seems to be getting written multiple times even though the size of the file shouldn't be 0 after the header is first written.
What am I missing?
import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone
async def write_csv(item: list, load_id: str, prefix: str) -> None:
Path("./test_files").mkdir(parents=True, exist_ok=True)
file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")
# Asynchronously write to our file
async with aiofiles.open(file_path, mode="a+", newline="") as f:
print(f"INFO: writing file: {Path(file_path).resolve()}")
w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
print(f"file size: {await aiofiles.os.path.getsize(file_path)}")
# If the file is empty, write the header
if await aiofiles.os.path.getsize(file_path) == 0:
print("file was empty! writing header")
# Write the header
async with asyncio.Lock():
await w.writerow([
"response",
"load_id",
"last_updated_timestamp_utc"
])
# do something special for specific file name
# I am just trying to simulate more random data processing
if prefix == "file_one":
# Shuffle the chunks again
item = random.shuffle(item)
# Write the data
for chunk in item:
async with asyncio.Lock():
await w.writerow([
json.dumps(chunk),
load_id,
datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
])
async def main() -> None:
# Create fake data
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500
# Possible file prefixes
prefixes: list[str] = ["file_one", "file_two"]
tasks: list = []
load_id = str(uuid.uuid4())
for i in items:
# Randomly assign which file we will write to
task = asyncio.create_task(write_csv(i, load_id, random.choice(prefixes)))
tasks.append(task)
errors = await asyncio.gather(*tasks, return_exceptions=True)
# print(errors)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
If you want to serialize access to a resource using an asyncio.Lock
instance, then all resource accessing code must try to acquire the same lock instance. But you have in your code:
...
# Write the data
for chunk in item:
async with asyncio.Lock():
...
Each task consequently is creating repeatedly new lock instances, which are not being shared among the other tasks.
A second problem is you have:
...
if prefix == "file_one":
# Shuffle the chunks again
item = random.shuffle(item)
...
random.shuffle
shuffles a sequence in place and returns None
. You should have instead:
...
if prefix == "file_one":
# Shuffle the chunks again
random.shuffle(item)
...
I would also recommend that if you have an open file f
, then the size can be most easily determined with:
size = f.seek(0, 2) # Seek to the end of file and return the offset
In the following code, each task opens the output CSV once but prior to writing a row it makes sure that it is positioned correctly by seeking to the current end of file. The row is then written and the output csv file flushed before releasing control:
import asyncio
import aiofiles
import aiofiles.os
import aiocsv
import uuid
import random
import json
from pathlib import Path
from datetime import datetime, timezone
async def write_csv(lock: asyncio.Lock, item: list, load_id: str, prefix: str) -> None:
Path("./test_files").mkdir(parents=True, exist_ok=True)
file_path = Path("./test_files").joinpath(f"{prefix}_{load_id}.csv")
async with aiofiles.open(file_path, mode="a+", newline="") as f:
print(f"INFO: writing file: {Path(file_path).resolve()}")
w: aiocsv.AsyncWriter = aiocsv.AsyncWriter(f)
async with lock:
# Asynchronously write to our file
size = await f.seek(0, 2) # Seek to end
# If the file is empty, write the header
if size == 0:
print("file was empty! writing header")
# Write the header
await w.writerow([
"response",
"load_id",
"last_updated_timestamp_utc"
])
await f.flush()
# Release the lock implicitly
# do something special for specific file name
# I am just trying to simulate more random data processing
if prefix == "file_one":
# Shuffle the chunks again
random.shuffle(item)
# Write the data
for chunk in item:
async with lock:
size = await f.seek(0, 2) # # Seek to end
print(f"{file_path} file size: {size}")
await w.writerow([
json.dumps(chunk),
load_id,
datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
])
await f.flush()
async def main() -> None:
# Create fake data
items: list[str] = [["hello", "world"], ["asyncio", "test"]] * 500
# Possible file prefixes
prefixes: list[str] = ["file_one", "file_two"]
tasks: list = []
load_id = str(uuid.uuid4())
lock = asyncio.Lock()
for i in items:
# Randomly assign which file we will write to
task = asyncio.create_task(write_csv(lock, i, load_id, random.choice(prefixes)))
tasks.append(task)
errors = await asyncio.gather(*tasks, return_exceptions=True)
# print(errors)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())