pythonmultithreadingpython-asynciocontextmanagerlocks

I want to lock a part of a async method in python basically it should be a(name based lock) where name is passed as argument to the method


In the below code the test() function waits for the request from UI as the request is received in JSON form it creates a task for every request by calling handle() fucntion

async def test():
     loop = asyncio.get_running_loop()
     req = await receiver.recv_string()
     logger.debug(f"Request received {req}")
     req_json = json.loads(req)
     logger.debug("Await create_task")
     loop.create_task(handle(req_json))

async def handle(req_json_):
     req_name = req_json_.get(req_name)
    # acquire lock here based on req_name if request comes with different name acquire the lock 
    # but if the request comes with same name block the request  
    # untill the req for that name is completed if the request is already completed then acquire 
    # the lock  with that name 
     logger.info(f"Request finished with req name {req_name} for action patch stack")

How can achieve this with asyncio module or any other way in python


Solution

  • It seems to me that all you need to do is maintain a dictionary of locks whose keys are the names given by variable req_name and whose values are the corresponding locks. If the key reg_name is not already in the dictionary, then a new lock for that key will be added:

    import asyncio
    from collections import defaultdict
    
    
    # dictionary of locks
    locks = defaultdict(asyncio.Lock)
    
    async def handle(req_json_):
        req_name = req_json_.get(req_name)
        # acquire lock here based on req_name if request comes with different name acquire the lock 
        # but if the request comes with same name block the request  
        # untill the req for that name is completed if the request is already completed then acquire 
        # the lock  with that name
        # Get lock from locks dictionary with name req_name. If it
        # does not exit, then create a new lock and store it with key
        # req_name and return it:
        lock = locks[req_name]
        async with lock:
            # do whatever needs to be done:
            ...
        logger.info(f"Request finished with req name {req_name} for action patch stack")
    

    Update

    If you need to timeout the attempt to acquire a lock, then create a coroutine that acquires the passed lock argument in conjunctions with a call to asyncio.wait_for with a suitable timeout argument:

    import asyncio
    from collections import defaultdict
    
    
    async def acquire_lock(lock):
        await lock.acquire()
    
    # dictionary of locks
    locks = defaultdict(asyncio.Lock)
    
    async def handle(req_json_):
        req_name = req_json_.get(req_name)
        lock = locks[req_name]
        # Try to acquire the lock but timeout after 1 second:
        try:
            await asyncio.wait_for(acquire_lock(lock), timeout=1.0)
        except asyncio.TimeoutError:
            # Here if the lockout could not be acquired:
            ...
        else:
            # Do whatever needs to be done
            # The lock must now be explicitly released:
            try:
                ...
                logger.info(f"Request finished with req name {req_name} for action patch stack")
            finally:
                lock.release()