I'm looking for a thread-safe implementation of a Semaphore I can use in Python.
The standard libraries asyncio.Semaphore isn't thread-safe.
The standard libraries threading.Semaphore doesn't have awaitable
interface.
I am using sanic which has multiple threads (workers) but also an asynchronous loop on each thread. I want to be able to yield execution back to the event loop on each of the workers whenever it encounters a blocked semaphore, while it waits.
UPDATE: I meant to say process here, not threads. So these should be that Sanic splits across processes, and multiprocessing.Semaphore. I believe the answer given is still relevant to where I can apply a similar solution.
If you are saying that you need a single semaphore instance to be used across all the threads, then create a threading.Semaphore
instance and am async function acquire_semaphore
such as the following:
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
semaphore = threading.Semaphore()
async def acquire_semaphore():
"""Make a threading.Semaphore awaitable."""
def acquirer():
semaphore.acquire()
loop = asyncio.get_running_loop()
await loop.run_in_executor(executor, acquirer)
async def test():
"""Acquire and release a semaphore sharable across threads."""
await acquire_semaphore()
print('acquired')
await asyncio.sleep(1)
semaphore.release() # This never blocks
print('released')
def worker():
asyncio.run(test())
def main():
threads = [
threading.Thread(target=worker)
for _ in range(3)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == '__main__':
main()
Prints:
acquired
released
acquired
released
acquired
released