I'm making a concurrent file downloader, which downloads in segment files and merges them afterwards. I want to improve it by pre-creating a big file with the whole size, then each thread will seek its corresponding start offset and write.
I've googled and tried different ways. None of them worked.
Simplified code:
from random import shuffle
from threading import Lock, Thread
lock = Lock()
def writer(i):
with lock:
with open("2.txt", "wb") as f:
f.seek(i)
f.write(f"{i}".encode("utf-8"))
f.flush()
def test():
# Pre-create the file
with open("2.txt", "wb") as f:
f.seek(7)
f.write(b"\0")
f.flush()
# Spawn and execute the threads in randomized order
# to mimic the concurrent downloader
threads = [Thread(target = writer, args = (i, )) for i in range(8)]
shuffle(threads)
[t.start() for t in threads]
[t.join() for t in threads]
# Read the result
with open("2.txt", "rb") as f:
print(f.read())
if __name__ == "__main__":
for _ in range(5):
test()
This code writes the file with "wb" in the writer
function, and gives this result:
[pairman@pairface bw]$ python test.py
b'\x00\x00\x00\x00\x005'
b'\x00\x00\x00\x00\x005'
b'\x00\x00\x00\x00\x00\x00\x007'
b'\x00\x002'
b'\x001'
As seen, "wb" overwrites the file if existing, not satisfying my need. If I change "wb" to "ab", the seek
won't work and the writer threads just append in random order:
[pairman@pairface bw]$ python test.py
b'\x00\x00\x00\x00\x00\x00\x00\x0062045173'
b'\x00\x00\x00\x00\x00\x00\x00\x0073502146'
b'\x00\x00\x00\x00\x00\x00\x00\x0005432716'
b'\x00\x00\x00\x00\x00\x00\x00\x0024036751'
b'\x00\x00\x00\x00\x00\x00\x00\x0052046317'
So is there any way to open, seek and write to a file truly concurrently?
There is no need to pre-create the file with content because you can seek and write beyond the current length of a file. The file will be extended and filled (where necessary) with binary zero.
Your calls to flush() are unnecessary.
Rather than shuffling a list of threads, just randomize the values in a range.
from random import sample
from threading import Lock, Thread
from pathlib import Path
def execute(i: int, lock: Lock, file: Path) -> None:
with lock:
with file.open("rb+") as data:
data.seek(i)
data.write(i.to_bytes())
def main(file: Path, nthreads: int) -> None:
assert nthreads <= 256
with file.open("wb"):
pass
lock = Lock()
threads = []
for i in sample(range(nthreads), nthreads):
thread = Thread(target=execute, args=(i, lock, file))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
with file.open("rb") as data:
content = data.read()
print(*map(int, content))
if __name__ == "__main__":
# cannot be more than 256 because of the way the output file is written
nthreads = 10
main(Path("foo.bin"), nthreads)
Output:
0 1 2 3 4 5 6 7 8 9