pythonpython-3.xmultithreadinglockingpython-asyncio

Python asyncio Lock.release(): object NoneType can't be used in 'await' expression


Code & Context

I'm building a multithreaded program to automate API calls, to retrieve all the information I need based off of a list of unique IDs.

To do this, I ended up creating my own Lock class to allow threads either concurrent read access to specific variables, or one single thread to read and write.

# Read-Write asynchronous & multithreaded locking.
class AsyncReadWriteLock:
    def __init__(self):
        self._readers = 0  # Number of active readers
        self._writer_lock = asyncio.Lock()  # Lock for writers
        self._readers_lock = asyncio.Lock()  # Lock to protect the readers counter
        self._readers_wait = asyncio.Condition(self._readers_lock)  # Condition for readers

    async def acquire_read(self):
        """Acquire the lock for reading."""
        # First wait for any active writer to complete
        async with self._writer_lock:
            # Wait for other readers' actions
            async with self._readers_lock:
                self._readers += 1 # Ensures readers count is accurate

    async def release_read(self):
        """Release the lock for reading."""
        # Wait for other readers' actions
        async with self._readers_lock:
            self._readers -= 1

            # No readers are left
            if self._readers == 0:
                # Notify writers waiting for all readers to finish
                self._readers_wait.notify_all()

    async def acquire_write(self):
        """Acquire the lock for writing."""
        # First acquire the writer lock to block other writers
        await self._writer_lock.acquire()

        # Now wait for all readers to finish
        async with self._readers_lock:
            while self._readers > 0:
                await self._readers_wait.wait()

    async def release_write(self):
        """Release the lock for writing."""
        # Release the writer lock, only if locked
        if self._writer_lock.locked():
            await self._writer_lock.release()

To provide an example, lets say we have the following variables:

CHECKPOINT_FILE = "./myCheckpoint.txt"
latest_checkpoint = 0 # Default value
checkpoint_lock = AsyncReadWriteLock() # Lock for checkpoint

This is the function where checkpoint_lock is first interacted with. It's also worth noting that it is the first lock variable in my program to be interacted with.

async def load_checkpoint() -> int:
    """
    Load the last completed batch index from checkpoint file
    Returns the index of the last completed batch, or -1 if no checkpoint exists
    """
    global latest_checkpoint
    await checkpoint_lock.acquire_write()
    try:
        with open(CHECKPOINT_FILE, 'r') as f:
            line = f.readline().strip()
            if line and line.isdigit():
                latest_checkpoint = int(line)
                logging.info(f"Loaded checkpoint: last completed index = {latest_checkpoint}")
                return max_completion
            else:
                logging.info("Checkpoint file exists but contains no valid index")
                return -1
    except FileNotFoundError:
        logging.info("No checkpoint file found, starting from the beginning")
        return -1
    except Exception as e:
        logging.error(f"Error loading checkpoint: {str(e)}")
        return -1
    finally:
        await checkpoint_lock.release_write()

The Idea

I should be able to have multiple concurrent threads calling checkpoint_lock.acquire_read(), such that they all access the values without error. Because none of the "reader" threads modify the values, it causes no problem.

If a thread wants to write, checkpoint_lock.acquire_write() is called, and it waits on all readers to finish, then acquires the write lock.

The Problem

What I'm currently experiencing is that, when the write section finishes, checkpoint_lock.release_write() is called. For some reason, within this function, I get the error: TypeError: object NoneType can't be used in 'await' expression

For the line await self._writer_lock.release() in release_write()

This is what I really can't understand. To get to this point, self._writer_lock cannot be None because self._writer_lock.locked() is first checked. Though if actually None, it should throw an error earlier (unless locked is a default method of python objects?)

Printing the variable proves its existence as an object: <asyncio.locks.Lock object at 0x70204edacbf0 [locked]>

If anyone can help figure out what is going on here to explain and fix this error, I'd appreciate the help.


Solution

    1. In the function release_write, you've mentioned asyncio.Lock.release(). asyncio.Lock.release() is a synchronous method, but you're trying to await it. When you await it, Python tries to treat the return value (which is None) as an awaitable, causing TypeError. So remove await from release_write()

    2. At present, you're allowing readers to block writers indefinitely(the line async with self._writer_lock: in function acquire_read). When a reader acquires _writer_lock, it blocks all writers till reader releases it. So, the purpose of a read-write lock is not fulfilled (readers should not block writers).

    class AsyncReadWriteLock:
        def __init__(self):
            self._readers = 0
            self._writer_lock = asyncio.Lock()
            self._readers_lock = asyncio.Lock()
            self._no_readers = asyncio.Event()
            self._no_readers.set()  # Initially, no readers
    
        async def acquire_read(self):
            async with self._readers_lock:
                self._readers += 1
                if self._readers == 1:
                    # First reader blocks writers
                    await self._writer_lock.acquire()
            self._no_readers.clear()
    
        async def release_read(self):
            async with self._readers_lock:
                self._readers -= 1
                if self._readers == 0:
                    # Last reader allows writers
                    self._writer_lock.release()
                    self._no_readers.set()
    
        async def acquire_write(self):
            await self._writer_lock.acquire()
            # Wait for existing readers to finish
            await self._no_readers.wait()
    
        async def release_write(self):
            self._writer_lock.release()
    

    New Usage of load_checkpoint() you've given in example:

    async def load_checkpoint() -> int:
        global latest_checkpoint
        await checkpoint_lock.acquire_write()
        try:
            # ... (file operations)
        finally:
            await checkpoint_lock.release_write()  # Now works without error