pythonpython-3.xasynchronouspython-asyncio

Running Python 3.12 asyncio tasks in parallel


What specifically needs to change in the Python 3.12 code below in order for each and every one of the calls to the write_to_file(linesBuffer) function to run in parallel instead of running sequentially?

In other words,

  1. We want for the program execution to continue without waiting for write_to_file(linesBuffer) to return,
  2. But we also want to make sure that each call to write_to_file(linesBuffer) does eventually return.

Each call to write_to_file(linesBuffer) should start at a different time, and return after whatever different duration might be required in order for each call to successfully complete its work. And there should never be delays waiting for one call to write_to_file(linesBuffer) to complete before the next call to write_to_file(linesBuffer) is initiated.

When we remove await from the write_to_file(linesBuffer) line, the result is that none of the print commands inside the write_to_file(linesBuffer) function ever get executed. So we cannot simply change await write_to_file(linesBuffer) to write_to_file(linesBuffer).

The problem in the code is that the many sequential calls to the await write_to_file(linesBuffer) function cause the program to become very slow.

Here is the code:

import os
import platform
import asyncio

numLines = 10

def get_source_file_path():
    if platform.system() == 'Windows':
        return 'C:\\path\\to\\sourceFile.txt'
    else:
        return '/path/to/sourceFile.txt'

async def write_to_file(linesBuffer):
    print("inside Writing to file...")
    with open('newFile.txt', 'a') as new_destination_file:
        for line in linesBuffer:
            new_destination_file.write(line)
    #get the name of the directory in which newFile.txt is located. Then print the name of the directory.
    directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
    print("directory_name: ", directory_name)
    linesBuffer.clear()
    #print every 1 second for 2 seconds.
    for i in range(2):
        print("HI HO, HI HO.  IT'S OFF TO WORK WE GO...")
        await asyncio.sleep(1)
    print("inside done Writing to file...")

async def read_source_file():
    source_file_path = get_source_file_path()
    linesBuffer = []
    counter = 0
    print("Reading source file...")
    print("source_file_path: ", source_file_path)
    #Detect the size of the file located at source_file_path and store it in the variable file_size.
    file_size = os.path.getsize(source_file_path)
    print("file_size: ", file_size)
    with open(source_file_path, 'r') as source_file:
        source_file.seek(0, os.SEEK_END)
        while True:
            line = source_file.readline()
            new_file_size = os.path.getsize(source_file_path)
            if new_file_size < file_size:
                print("The file has been truncated.")
                source_file.seek(0, os.SEEK_SET)
                file_size = new_file_size
                linesBuffer.clear()
                counter = 0
                print("new_file_size: ", new_file_size)
            if len(line) > 0:
              new_line = str(counter) + " line: " + line
              print(new_line)
              linesBuffer.append(new_line)
              print("len(linesBuffer): ", len(linesBuffer))
              if len(linesBuffer) >= numLines:
                print("Writing to file...")
                await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.    
                print("awaiting Writing to file...")
                linesBuffer.clear()
              counter += 1
              print("counter: ", counter)
            if not line:
                await asyncio.sleep(0.1)
                continue
            #detect whether or not the present line is the last line in the file.  If it is the last line in the file, then write the line to the file.
            if source_file.tell() == file_size:
                print("LAST LINE IN FILE FOUND.  Writing to file...")
                await write_to_file(linesBuffer)
                print("awaiting Writing to file...")
                linesBuffer.clear()
                counter = 0
        
async def main():
    await read_source_file()

if __name__ == '__main__':
    asyncio.run(main())

Solution

  • A couple of points:

    First, as been commented upon, the file I/O you are doing is not asynchronous and asyncio does not support asynchronous file I/O. For this I would suggest you install from the PyPI repository the aiofiles module.

    Second, you have the following:

    await write_to_file(linesBuffer) #When we remove await from this line, the function never runs.
    

    Actually, without the await the function write_to_file never gets called. The expression write_to_file(linesBuffer) only results in returning a coroutine that must be awaited if you want to call it, as you are currently doing. But this call is actually synchronous in that the caller is suspended, the coroutine is called and once it completes and returns a value (even if it is the implicit None if there is no return statement) the caller then resumes with await write_to_file(linesBuffer) evaluating to that return value.

    But you want write_to_file to run asynchronously (concurrently) with your read_source_file coroutine. For that, you need to create a separate task. See asyncio.create_task for details. Pay particular attention about saving the task instance returned by this call to prevent the task from prematurely terminating due to it being garbage collected.

    So basically your modified code would be as follows (I have not verified that its overall logic is correct):

    import os
    import platform
    import asyncio
    import aiofiles
    
    numLines = 10
    
    def get_source_file_path():
        if platform.system() == 'Windows':
            return 'C:\\path\\to\\sourceFile.txt'
        else:
            return '/path/to/sourceFile.txt'
    
    async def write_to_file(linesBuffer):
        print("inside Writing to file...")
        async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
            for line in linesBuffer:
                await new_destination_file.write(line)
        #get the name of the directory in which newFile.txt is located. Then print the name of the directory.
        directory_name = os.path.dirname(os.path.abspath('newFile.txt'))
        print("directory_name: ", directory_name)
        linesBuffer.clear()
        #print every 1 second for 2 seconds.
        for i in range(2):
            print("HI HO, HI HO.  IT'S OFF TO WORK WE GO...")
            await asyncio.sleep(1)
        print("inside done Writing to file...")
    
    async def read_source_file():
        source_file_path = get_source_file_path()
        linesBuffer = []
        counter = 0
        print("Reading source file...")
        print("source_file_path: ", source_file_path)
        #Detect the size of the file located at source_file_path and store it in the variable file_size.
        file_size = os.path.getsize(source_file_path)
        print("file_size: ", file_size)
        
        background_tasks = set()
        
        async with aiofiles.open(source_file_path, 'r') as source_file:
            await source_file.seek(0, os.SEEK_END)
            while True:
                line = await source_file.readline()
                new_file_size = os.path.getsize(source_file_path)
                if new_file_size < file_size:
                    print("The file has been truncated.")
                    await source_file.seek(0, os.SEEK_SET)
                    file_size = new_file_size
                    linesBuffer.clear()
                    counter = 0
                    print("new_file_size: ", new_file_size)
                if len(line) > 0:
                  new_line = str(counter) + " line: " + line
                  print(new_line)
                  linesBuffer.append(new_line)
                  print("len(linesBuffer): ", len(linesBuffer))
                  if len(linesBuffer) >= numLines:
                    print("Writing to file...")
                    task = asyncio.create_task(write_to_file(linesBuffer))
                    background_tasks.add(task)
                    task.add_done_callback(background_tasks.discard)
                    linesBuffer.clear()
                  counter += 1
                  print("counter: ", counter)
                if not line:
                    await asyncio.sleep(0.1)
                    continue
                #detect whether or not the present line is the last line in the file.  If it is the last line in the file, then write the line to the file.
                if await source_file.tell() == file_size:
                    print("LAST LINE IN FILE FOUND.  Writing to file...")
                    task = asyncio.create_task(write_to_file(linesBuffer))
                    background_tasks.add(task)
                    task.add_done_callback(background_tasks.discard)
                    linesBuffer.clear()
                    counter = 0
            
    async def main():
        await read_source_file()
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    Update

    Maybe you want something like this. This code essentially look to see if lines have been appended to an input file and if so, they are accumulated in a batch and sent to another task for adding to an output file.

    If you see that the input file has been truncated, you execute:

    await source_file.seek(0, os.SEEK_SET)
    

    That positions you to the beginning of the file. Is that what you really want? I don't get it. Since you clearly know what it is you want, you will be in a better position to make adjustments to this code. If it's not even in the ballpark, then I surrender.

    import os
    import platform
    import asyncio
    import aiofiles
    
    BATCH_SIZE = 10
    
    def get_source_file_path():
        if platform.system() == 'Windows':
            return 'C:\\path\\to\\sourceFile.txt'
        else:
            return '/path/to/sourceFile.txt'
    
    async def write_to_file(queue):
        async with aiofiles.open('newFile.txt', 'a') as new_destination_file:
            while True:
                lines = queue.get()
                print("Writing to file...")
                for line in lines:
                    await new_destination_file.write(line)
                print("Done Writing to file...")
    
    async def read_source_file():
        source_file_path = get_source_file_path()
        counter = 0
        print("Reading source file...")
        print("source_file_path: ", source_file_path)
        #Detect the size of the file located at source_file_path and store it in the variable file_size.
        file_size = os.path.getsize(source_file_path)
        print("file_size: ", file_size)
    
        queue = asyncio.Queue()
        write_task = asyncio.create_task(write_to_file)
    
        async with aiofiles.open(source_file_path, 'r') as source_file:
            await source_file.seek(0, os.SEEK_END)
            linesBuffer = []
            while True:
                # Always make sure that file_size is the current size:
                old_file_size = file_size
                file_size = os.path.getsize(source_file_path)
                if file_size < old_file_size:
                    print("The file has been truncated.")
                    await source_file.seek(0, os.SEEK_SET)
                    # Allocate a new list instead of clearing the current one
                    linesBuffer = []
                    counter = 0
                    print("new_file_size: ", new_file_size)
                    continue
    
                line = await source_file.readline()
                if line:
                    new_line = str(counter) + " line: " + line
                    print(new_line)
                    linesBuffer.append(new_line)
                    print("len(linesBuffer): ", len(linesBuffer))
    
                    if len(linesBuffer) == BATCH_SIZE:
                        print("Writing batch to file...")
                        await queue.put(linesBuffer)
                        linesBuffer = []
                        counter += 1
                        print("counter: ", counter)
    
                    #detect whether or not the present line is the last line in the file.
                    # If it is the last line in the file, then write whatever batch
                    # we have even if it is not complete.
                    if await source_file.tell() == file_size:
                        print("LAST LINE IN FILE FOUND.")
                        if linesBuffer:
                            # Write even though it's not a full batch:
                            await queue.put(linesBuffer)
                            linesBuffer = []
                        counter = 0
                else:
                    await asyncio.sleep(0.1)
    
    async def main():
        await read_source_file()
    
    if __name__ == '__main__':
        asyncio.run(main())
    

    If you are actually sending your batches to an api and you want these to run concurrently, then:

    import os
    import platform
    import asyncio
    import aiofiles
    
    BATCH_SIZE = 10
    
    def get_source_file_path():
        if platform.system() == 'Windows':
            return 'C:\\path\\to\\sourceFile.txt'
        else:
            return '/path/to/sourceFile.txt'
    
    async def send_to_api(linesBuffer):
        ...
    
    async def read_source_file():
        source_file_path = get_source_file_path()
        counter = 0
        print("Reading source file...")
        print("source_file_path: ", source_file_path)
        #Detect the size of the file located at source_file_path and store it in the variable file_size.
        file_size = os.path.getsize(source_file_path)
        print("file_size: ", file_size)
    
        background_tasks = set()
    
        async with aiofiles.open(source_file_path, 'r') as source_file:
            await source_file.seek(0, os.SEEK_END)
            linesBuffer = []
            while True:
                # Always make sure that file_size is the current size:
                old_file_size = file_size
                file_size = os.path.getsize(source_file_path)
                if file_size < old_file_size:
                    print("The file has been truncated.")
                    await source_file.seek(0, os.SEEK_SET)
                    # Allocate a new list instead of clearing the current one
                    linesBuffer = []
                    counter = 0
                    print("new_file_size: ", new_file_size)
                    continue
    
                line = await source_file.readline()
                if line:
                    new_line = str(counter) + " line: " + line
                    print(new_line)
                    linesBuffer.append(new_line)
                    print("len(linesBuffer): ", len(linesBuffer))
    
                    if len(linesBuffer) == BATCH_SIZE:
                        print("sending to api...")
                        task = asyncio.create_task(send_to_api(linesBuffer))
                        background_tasks.add(task)
                        task.add_done_callback(background_tasks.discard)
                        # Do not clear the buffer; allocate a new one:
                        linesBuffer = []
                        counter += 1
                        print("counter: ", counter)
    
                    #detect whether or not the present line is the last line in the file.
                    # If it is the last line in the file, then write whatever batch
                    # we have even if it is not complete.
                    if await source_file.tell() == file_size:
                        print("LAST LINE IN FILE FOUND.")
                        if linesBuffer:
                            # Send even though it's not a full batch:
                            task = asyncio.create_task(send_to_api(linesBuffer))
                            background_tasks.add(task)
                            task.add_done_callback(background_tasks.discard)
                            linesBuffer = []
                        counter = 0
                else:
                    await asyncio.sleep(0.1)
    
    async def main():
        await read_source_file()
    
    if __name__ == '__main__':
        asyncio.run(main())