pythonmultithreadingfilefwriteseek

Python - Concurrent file seek and write


I'm making a concurrent file downloader, which downloads in segment files and merges them afterwards. I want to improve it by pre-creating a big file with the whole size, then each thread will seek its corresponding start offset and write.

I've googled and tried different ways. None of them worked.

Simplified code:

from random import shuffle
from threading import Lock, Thread

lock = Lock()

def writer(i):
    with lock:
        with open("2.txt", "wb") as f:
            f.seek(i)
            f.write(f"{i}".encode("utf-8"))
            f.flush()

def test():
    # Pre-create the file
    with open("2.txt", "wb") as f:
        f.seek(7)
        f.write(b"\0")
        f.flush()

    # Spawn and execute the threads in randomized order
    # to mimic the concurrent downloader
    threads = [Thread(target = writer, args = (i, )) for i in range(8)]
    shuffle(threads)
    [t.start() for t in threads]
    [t.join() for t in threads]
    # Read the result
    with open("2.txt", "rb") as f:
        print(f.read())

if __name__ == "__main__":
    for _ in range(5):
        test()

This code writes the file with "wb" in the writer function, and gives this result:

[pairman@pairface bw]$ python test.py 
b'\x00\x00\x00\x00\x005'
b'\x00\x00\x00\x00\x005'
b'\x00\x00\x00\x00\x00\x00\x007'
b'\x00\x002'
b'\x001'

As seen, "wb" overwrites the file if existing, not satisfying my need. If I change "wb" to "ab", the seek won't work and the writer threads just append in random order:

[pairman@pairface bw]$ python test.py 
b'\x00\x00\x00\x00\x00\x00\x00\x0062045173'
b'\x00\x00\x00\x00\x00\x00\x00\x0073502146'
b'\x00\x00\x00\x00\x00\x00\x00\x0005432716'
b'\x00\x00\x00\x00\x00\x00\x00\x0024036751'
b'\x00\x00\x00\x00\x00\x00\x00\x0052046317'

So is there any way to open, seek and write to a file truly concurrently?


Solution

  • There is no need to pre-create the file with content because you can seek and write beyond the current length of a file. The file will be extended and filled (where necessary) with binary zero.

    Your calls to flush() are unnecessary.

    Rather than shuffling a list of threads, just randomize the values in a range.

    from random import sample
    from threading import Lock, Thread
    from pathlib import Path
    
    def execute(i: int, lock: Lock, file: Path) -> None:
        with lock:
            with file.open("rb+") as data:
                data.seek(i)
                data.write(i.to_bytes())
    
    def main(file: Path, nthreads: int) -> None:
        assert nthreads <= 256
        with file.open("wb"):
            pass
    
        lock = Lock()
        threads = []
    
        for i in sample(range(nthreads), nthreads):
            thread = Thread(target=execute, args=(i, lock, file))
            threads.append(thread)
            thread.start()
    
        for thread in threads:
            thread.join()
    
        with file.open("rb") as data:
            content = data.read()
            print(*map(int, content))
    
    if __name__ == "__main__":
        # cannot be more than 256 because of the way the output file is written
        nthreads = 10 
        main(Path("foo.bin"), nthreads)
    

    Output:

    0 1 2 3 4 5 6 7 8 9