pythonmultithreadingpython-asyncio

How to use threading.Lock in async function while object can be accessed from multiple thread


I want to use threading.Lock() in a async function, asyncio.Lock() is not thread safe so I cannot do with await asyncio.Lock():. The reason I need to use threading.Lock() is because this object may be accessed by multiple treads, hence it is used in a web app and the server running it can spin up many threads. What is an effective way of doing so ? So far I've tried a simple function that uses the lock:

1)

async def main():
    with await threading.Lock():
        a = 6
    return a

TypeError: object _thread.lock can't be used in 'await' expression
async def main():
    async with threading.Lock():
            a = 1564666546
    return a

AttributeError: __aexit__

Solution

  • You can't pass a threading.Lock to async with because it is not designed for async usage, it's a blocking primitive. More importantly, async with threading.Lock() doesn't make sense even if it did work because you would be acquiring a brand new lock, which would always succeed. For locking to work, you must share a lock between multiple threads, e.g. by storing it in an object's attribute. The rest of this answer will assume that you have a threading.Lock shared between threads.

    Since threading.Lock always blocks, the only way you can use it from asyncio is to wait to acquire it in a separate thread, and suspend the execution of the current coroutine until the lock is successfully acquired. The functionality of running a blocking function in a different thread is already covered by the run_in_executor event loop method, which you can apply:

    _pool = concurrent.futures.ThreadPoolExecutor()
    
    async def work(lock, other_args...):
        # lock is a threading.Lock shared between threads
    
        loop = asyncio.get_event_loop()
        # Acquire the lock in a worker thread, suspending us while waiting.
        await loop.run_in_executor(_pool, lock.acquire)
    
        ... access the object with the lock held ...
    
        # Can release directly because release() doesn't block and a
        # threading.Lock can be released from any thread.
        lock.release()
    

    You can make this more elegant to use (and exception-safe) by creating an async context manager:

    _pool = concurrent.futures.ThreadPoolExecutor()
    
    @contextlib.asynccontextmanager
    async def async_lock(lock):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(_pool, lock.acquire)
        try:
            yield  # the lock is held
        finally:
            lock.release()
    

    Then you can use it as follows:

    # lock is a threading.Lock shared between threads
    async with async_lock(lock):
        ... access the object with the lock held ...
    

    Of course, in code not running in the asyncio event loop you'd just acquire the lock directly:

    # lock is a threading.Lock shared between threads
    with lock:
       ... access the object ...
    

    Note that we use a separate thread pool instead of passing None to run_in_executor() to reuse the default pool. This is to avoid deadlock in situations where the function that holds the lock itself needs access to the thread pool for other uses of run_in_executor(). By keeping the thread pool private, we avoid the possibility of deadlocking through the use of the same pool by others.