In the below code the test() function waits for the request from UI as the request is received in JSON form it creates a task for every request by calling handle() fucntion
async def test():
loop = asyncio.get_running_loop()
req = await receiver.recv_string()
logger.debug(f"Request received {req}")
req_json = json.loads(req)
logger.debug("Await create_task")
loop.create_task(handle(req_json))
async def handle(req_json_):
req_name = req_json_.get(req_name)
# acquire lock here based on req_name if request comes with different name acquire the lock
# but if the request comes with same name block the request
# untill the req for that name is completed if the request is already completed then acquire
# the lock with that name
logger.info(f"Request finished with req name {req_name} for action patch stack")
How can achieve this with asyncio module or any other way in python
It seems to me that all you need to do is maintain a dictionary of locks whose keys are the names given by variable req_name
and whose values are the corresponding locks. If the key reg_name
is not already in the dictionary, then a new lock for that key will be added:
import asyncio
from collections import defaultdict
# dictionary of locks
locks = defaultdict(asyncio.Lock)
async def handle(req_json_):
req_name = req_json_.get(req_name)
# acquire lock here based on req_name if request comes with different name acquire the lock
# but if the request comes with same name block the request
# untill the req for that name is completed if the request is already completed then acquire
# the lock with that name
# Get lock from locks dictionary with name req_name. If it
# does not exit, then create a new lock and store it with key
# req_name and return it:
lock = locks[req_name]
async with lock:
# do whatever needs to be done:
...
logger.info(f"Request finished with req name {req_name} for action patch stack")
Update
If you need to timeout the attempt to acquire a lock, then create a coroutine that acquires the passed lock argument in conjunctions with a call to asyncio.wait_for
with a suitable timeout argument:
import asyncio
from collections import defaultdict
async def acquire_lock(lock):
await lock.acquire()
# dictionary of locks
locks = defaultdict(asyncio.Lock)
async def handle(req_json_):
req_name = req_json_.get(req_name)
lock = locks[req_name]
# Try to acquire the lock but timeout after 1 second:
try:
await asyncio.wait_for(acquire_lock(lock), timeout=1.0)
except asyncio.TimeoutError:
# Here if the lockout could not be acquired:
...
else:
# Do whatever needs to be done
# The lock must now be explicitly released:
try:
...
logger.info(f"Request finished with req name {req_name} for action patch stack")
finally:
lock.release()