I want to use threading.Lock()
in a async function, asyncio.Lock()
is not thread safe so I cannot do with await asyncio.Lock():
. The reason I need to use threading.Lock()
is because this object may be accessed by multiple treads, hence it is used in a web app and the server running it can spin up many threads. What is an effective way of doing so ? So far I've tried a simple function that uses the lock:
1)
async def main():
with await threading.Lock():
a = 6
return a
TypeError: object _thread.lock can't be used in 'await' expression
async def main():
async with threading.Lock():
a = 1564666546
return a
AttributeError: __aexit__
You can't pass a threading.Lock
to async with
because it is not designed for async usage, it's a blocking primitive. More importantly, async with threading.Lock()
doesn't make sense even if it did work because you would be acquiring a brand new lock, which would always succeed. For locking to work, you must share a lock between multiple threads, e.g. by storing it in an object's attribute. The rest of this answer will assume that you have a threading.Lock
shared between threads.
Since threading.Lock
always blocks, the only way you can use it from asyncio is to wait to acquire it in a separate thread, and suspend the execution of the current coroutine until the lock is successfully acquired. The functionality of running a blocking function in a different thread is already covered by the run_in_executor
event loop method, which you can apply:
_pool = concurrent.futures.ThreadPoolExecutor()
async def work(lock, other_args...):
# lock is a threading.Lock shared between threads
loop = asyncio.get_event_loop()
# Acquire the lock in a worker thread, suspending us while waiting.
await loop.run_in_executor(_pool, lock.acquire)
... access the object with the lock held ...
# Can release directly because release() doesn't block and a
# threading.Lock can be released from any thread.
lock.release()
You can make this more elegant to use (and exception-safe) by creating an async context manager:
_pool = concurrent.futures.ThreadPoolExecutor()
@contextlib.asynccontextmanager
async def async_lock(lock):
loop = asyncio.get_event_loop()
await loop.run_in_executor(_pool, lock.acquire)
try:
yield # the lock is held
finally:
lock.release()
Then you can use it as follows:
# lock is a threading.Lock shared between threads
async with async_lock(lock):
... access the object with the lock held ...
Of course, in code not running in the asyncio event loop you'd just acquire the lock directly:
# lock is a threading.Lock shared between threads
with lock:
... access the object ...
Note that we use a separate thread pool instead of passing None
to run_in_executor()
to reuse the default pool. This is to avoid deadlock in situations where the function that holds the lock itself needs access to the thread pool for other uses of run_in_executor()
. By keeping the thread pool private, we avoid the possibility of deadlocking through the use of the same pool by others.