asynchronouspython-asyncioisort

How can I utilize asyncio to make third party file operations faster?


I am utilizing a third party library called isort. isort has an available function that opens and reads a file. In order to speed this up I attempted to changed the function called isort.check_file to make it perform asynchronously. The method check_file takes the file path, however the current behaviour that I have attempted does not work.

    ...
    coroutines= [self.check_file('c:\\example1.py'), self.check_file('c:\\example2.py')]
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(asyncio.gather(*coroutines))
    ...

    async def check_file(self, changed_file):
        return isort.check_file(changed_file)

However, this does not seem to work. How can I make the library call isort.check_file be utilized correctly with asyncio.gather?


Solution

  • Better understanding of IO Bottleneck and GIL

    What your async function check_file doing is just as same without async at front. To get any meaningful performance asynchronously, you Must be using some sort of Awaitables - which requires await keyword.

    So basically what you did is:

    import time
    
    async def wait(n):
        time.sleep(n)
    

    Which does absolutely no good for asynchronous operations. To make such synchronous function asynchronous - assuming it's mostly IO-bound - you can use asyncio.to_thread instead.

    import asyncio
    import time
    
    
    async def task():
        await asyncio.to_thread(time.sleep, 10)  # <- await + something that's awaitable
        # similar to await asyncio.sleep(10) now
    
    
    async def main():
        tasks = [task() for _ in range(10)]
        await asyncio.gather(*tasks)
    
    
    asyncio.run(main())
    

    That essentially moves IO bound operation out of main thread, so main thread can do it's work without waiting for IO works.

    But there's catch - Python's Global Interpreter Lock(GIL).

    Due to CPython - official python implementation - limitation, only 1 python interpreter thread can run in at any given moment, stalling all others.

    Then how we achieve better performance in python? Just simply by releasing GIL during IO operations.

    IO Operations are basically just like this:

    "Hey OS, please do this IO works for me. Wake me up when it's done."
    Thread 1 goes to sleep

    Some time later, OS punches Thread 1
    "Your IO Operation is done, take this and get back to work."

    So all it does is Doing Nothing - for such cases, aka IO Bound stuffs, GIL can be safely released and let other threads to run. Built-in functions like time.sleep, open(), etc implements such GIL release logic in their C code.

    This doesn't change much in asyncio, which is internally bunch of event checks and callbacks. Each asyncio,Tasks works like threads in some degree - tasks asking main loop to wake them up when IO operation is done.

    Now these basic simplified concepts sorted out, we can go back to your question.


    CPU Bottleneck and IO Bottleneck

    Bsically what you're up against is Not an IO bottleneck. It's mostly CPU/etc bottleneck.

    Loading merely few KB of texts from local drives then running tons of intense Python code afterward doesn't count as an IO bound operation.


    Testing

    Let's consider following test case:

    We can expect that:

    With folder structure of:

    ├─ main.py
    └─ import_messes
         ├─ lib_0.py
         ├─ lib_1.py
         ├─ lib_2.py
         ├─ lib_3.py
         ├─ lib_4.py
         ├─ lib_5.py
         ├─ lib_6.py
         ├─ lib_7.py
         ├─ lib_8.py
         └─ lib_9.py
    

    Which we'll load 1000 times each, making up to total 10000 loads.

    Each of those are filled with random imports I grabbed from asyncio.

    from asyncio.base_events import *
    from asyncio.coroutines import *
    from asyncio.events import *
    from asyncio.exceptions import *
    from asyncio.futures import *
    from asyncio.locks import *
    from asyncio.protocols import *
    from asyncio.runners import *
    from asyncio.queues import *
    from asyncio.streams import *
    from asyncio.subprocess import *
    from asyncio.tasks import *
    from asyncio.threads import *
    from asyncio.transports import *
    

    Source code(main.py):

    """
    asynchronous isort demo
    """
    
    import pathlib
    import asyncio
    import itertools
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from timeit import timeit
    
    import isort
    from isort import format
    
    
    # target dir with modules
    FILE = pathlib.Path("./import_messes")
    
    
    # Monkey-patching isort.format.create_terminal_printer to suppress Terminal bombarding.
    # Totally not required nor recommended for normal use
    class SuppressionPrinter:
        def __init__(self, *_, **__):
            pass
    
        def success(self, *_):
            pass
    
        def error(self, *_):
            pass
    
        def diff_line(self, *_):
            pass
    
    
    isort.format.BasicPrinter = SuppressionPrinter
    
    
    # -----------------------------
    # Test functions
    
    def filelist_gen():
        """Chain directory list multiple times to get meaningful difference"""
        yield from itertools.chain.from_iterable([FILE.iterdir() for _ in range(1000)])
    
    
    def isort_synchronous(path_iter):
        """Synchronous usual isort use-case"""
    
        # return list of results
        return [isort.check_file(file) for file in path_iter]
    
    
    def isort_thread(path_iter):
        """Threading isort"""
    
        # prepare thread pool
        with ThreadPoolExecutor(max_workers=2) as executor:
            # start loading
            futures = [executor.submit(isort.check_file, file) for file in path_iter]
    
            # return list of results
            return [fut.result() for fut in futures]
    
    
    def isort_multiprocess(path_iter):
        """Multiprocessing isort"""
    
        # prepare process pool
        with ProcessPoolExecutor(max_workers=2) as executor:
            # start loading
            futures = [executor.submit(isort.check_file, file) for file in path_iter]
    
            # return list of results
            return [fut.result() for fut in futures]
    
    
    async def isort_asynchronous(path_iter):
        """Asyncio isort using to_thread"""
    
        # create coroutines that delegate sync funcs to threads
        coroutines = [asyncio.to_thread(isort.check_file, file) for file in path_iter]
    
        # run coroutines and wait for results
        return await asyncio.gather(*coroutines)
    
    
    if __name__ == '__main__':
        # run once, no repetition
        n = 1
    
        # synchronous runtime
        print(f"Sync func.: {timeit(lambda: isort_synchronous(filelist_gen()), number=n):.4f}")
    
        # threading demo
        print(f"Threading : {timeit(lambda: isort_thread(filelist_gen()), number=n):.4f}")
    
        # multiprocessing demo
        print(f"Multiproc.: {timeit(lambda: isort_multiprocess(filelist_gen()), number=n):.4f}")
    
        # asyncio to_thread demo
        print(f"to_thread : {timeit(lambda: asyncio.run(isort_asynchronous(filelist_gen())), number=n):.4f}")
    

    Run results

    Sync func.: 18.1764
    Threading : 18.3138
    Multiproc.: 9.5206
    to_thread : 27.3645
    

    (above results are ran on NVME ssd)

    You can see isort.check_file is not an IO-Bound operation on fast IO devices. Therefore best bet is using Multiprocessing, if Really needed with such fast drives.

    If number of files are low in above 'Fast IO Device' situations, like hundred or below, multiprocessing will suffer even more than using asyncio.to_thread, because cost to spawn, communicate, and kill process overwhelm the multiprocessing's benefits.

    However - With slow IO devics like HDD Threading/async is totally valid idea and will give a great boost in performance.

    Experiment with your usecase, adjust core/thread count (max_workers) to best fit your enviornments and your usecase.