pythonpython-3.xpython-asynciopython-decoratorsprocess-pool

How to properly transform a sync function to an async one?


I'm writing a telegram bot and I need the bot to be available to users even when it is processing some previous request. My bot downloads some videos and compresses them if it exceeds the size limit, so it takes some time to process the request. I want to turn my sync functions to async ones and handle them within another process to make this happen.

I found a way to do this, using this article but it doesn't work for me. That's my code to test the solution:

import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import wraps, partial

executor = ProcessPoolExecutor()

def async_wrap(func):
    @wraps(func)
    async def run(*args, **kwargs):
        loop = asyncio.get_running_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    
    return run 

@async_wrap
def sync_func(a):
    import time
    time.sleep(10)

if __name__ == "__main__":
    asyncio.run(sync_func(4))

As a result, I've got the following error message:

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function sync_func at 0x7f2e333625f0>: it's not the same object as __main__.sync_func
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/mikhail/Projects/social_network_parsing_bot/processes.py", line 34, in <module>
    asyncio.run(sync_func(4))
  File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
    return future.result()
  File "/home/mikhail/Projects/social_network_parsing_bot/processes.py", line 18, in run
    return await loop.run_in_executor(executor, pfunc)
  File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/queues.py", line 245, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/mikhail/.pyenv/versions/3.10.4/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function sync_func at 0x7f2e333625f0>: it's not the same object as __main__.sync_func

As I understand, the error arises because decorator changes the function and as a result returns a new object. What I need to change in my code to make it work. Maybe I don't understand some crucial concepts and there is some simple method to achieve the desired. Thanks for help


Solution

  • The article runs a nice experiment, but it really is just meant to work with a threaded-pool exercutor - not a multi-processing one.

    If you see its code, at some point it passes executor=None to the .run_in_executor call, and asyncio creates a default executor which is a ThreadPoolExecutor.

    The main difference to a ProcessPoolExecutor is that all data moved cross-process (and therefore, all data sent to the workers, including the target functions) have to be serialized - and it is done via Python's pickle.

    Now, Pickle serialization of functions do not really send the function objects, along with its bytecode, down the wire: rather, it just sends the function qualname, and it is expected that the function with the same qualname on the other end is the same as the original function.

    In the case of your code, the func which is the target for the executor-pool is the declared function, prior to it being wrapped in the decorator ( __main__.sync_func) . But what exists with this name in the target process is the post-decorated function. So, if Python would not block it due to the functions not being the same, you'd get into an infinite-loop creating hundreds of nested subprocess and never actually calling your function - as the entry-point in the target would be the wrapped function. That is just an error in the article you viewed.

    All this said, the simpler way to make all this work, is instead of using this decorator in the usual fashion, just keep the original, undecorated function, in the module namespace, and create a new name for the wrapped function - this way, the "raw" code can be the target for the executor:

    (...)
    def sync_func(a):
        import time
        time.sleep(2)
        print(f"finished {a}")
    
    # this creates the decorated function with a new name,
    # instead of replacing the original:
    wrapped_sync = async_wrap(sync_func)
    
    if __name__ == "__main__":
        asyncio.run(wrapped_sync("go go go"))