I'm building a multithreaded program to automate API calls, to retrieve all the information I need based off of a list of unique IDs.
To do this, I ended up creating my own Lock class to allow threads either concurrent read access to specific variables, or one single thread to read and write.
# Read-Write asynchronous & multithreaded locking.
class AsyncReadWriteLock:
def __init__(self):
self._readers = 0 # Number of active readers
self._writer_lock = asyncio.Lock() # Lock for writers
self._readers_lock = asyncio.Lock() # Lock to protect the readers counter
self._readers_wait = asyncio.Condition(self._readers_lock) # Condition for readers
async def acquire_read(self):
"""Acquire the lock for reading."""
# First wait for any active writer to complete
async with self._writer_lock:
# Wait for other readers' actions
async with self._readers_lock:
self._readers += 1 # Ensures readers count is accurate
async def release_read(self):
"""Release the lock for reading."""
# Wait for other readers' actions
async with self._readers_lock:
self._readers -= 1
# No readers are left
if self._readers == 0:
# Notify writers waiting for all readers to finish
self._readers_wait.notify_all()
async def acquire_write(self):
"""Acquire the lock for writing."""
# First acquire the writer lock to block other writers
await self._writer_lock.acquire()
# Now wait for all readers to finish
async with self._readers_lock:
while self._readers > 0:
await self._readers_wait.wait()
async def release_write(self):
"""Release the lock for writing."""
# Release the writer lock, only if locked
if self._writer_lock.locked():
await self._writer_lock.release()
To provide an example, lets say we have the following variables:
CHECKPOINT_FILE = "./myCheckpoint.txt"
latest_checkpoint = 0 # Default value
checkpoint_lock = AsyncReadWriteLock() # Lock for checkpoint
This is the function where checkpoint_lock
is first interacted with. It's also worth noting that it is the first lock variable in my program to be interacted with.
async def load_checkpoint() -> int:
"""
Load the last completed batch index from checkpoint file
Returns the index of the last completed batch, or -1 if no checkpoint exists
"""
global latest_checkpoint
await checkpoint_lock.acquire_write()
try:
with open(CHECKPOINT_FILE, 'r') as f:
line = f.readline().strip()
if line and line.isdigit():
latest_checkpoint = int(line)
logging.info(f"Loaded checkpoint: last completed index = {latest_checkpoint}")
return max_completion
else:
logging.info("Checkpoint file exists but contains no valid index")
return -1
except FileNotFoundError:
logging.info("No checkpoint file found, starting from the beginning")
return -1
except Exception as e:
logging.error(f"Error loading checkpoint: {str(e)}")
return -1
finally:
await checkpoint_lock.release_write()
I should be able to have multiple concurrent threads calling checkpoint_lock.acquire_read()
, such that they all access the values without error. Because none of the "reader" threads modify the values, it causes no problem.
If a thread wants to write, checkpoint_lock.acquire_write()
is called, and it waits on all readers to finish, then acquires the write lock.
What I'm currently experiencing is that, when the write section finishes, checkpoint_lock.release_write()
is called. For some reason, within this function, I get the error: TypeError: object NoneType can't be used in 'await' expression
For the line await self._writer_lock.release()
in release_write()
This is what I really can't understand. To get to this point, self._writer_lock
cannot be None
because self._writer_lock.locked()
is first checked. Though if actually None
, it should throw an error earlier (unless locked
is a default method of python objects?)
Printing the variable proves its existence as an object: <asyncio.locks.Lock object at 0x70204edacbf0 [locked]>
If anyone can help figure out what is going on here to explain and fix this error, I'd appreciate the help.
In the function release_write
, you've mentioned asyncio.Lock.release()
. asyncio.Lock.release()
is a synchronous method, but you're trying to await it. When you await it, Python tries to treat the return value (which is None) as an awaitable, causing TypeError. So remove await
from release_write()
At present, you're allowing readers to block writers indefinitely(the line async with self._writer_lock:
in function acquire_read
). When a reader acquires _writer_lock
, it blocks all writers till reader releases it. So, the purpose of a read-write lock is not fulfilled (readers should not block writers).
class AsyncReadWriteLock:
def __init__(self):
self._readers = 0
self._writer_lock = asyncio.Lock()
self._readers_lock = asyncio.Lock()
self._no_readers = asyncio.Event()
self._no_readers.set() # Initially, no readers
async def acquire_read(self):
async with self._readers_lock:
self._readers += 1
if self._readers == 1:
# First reader blocks writers
await self._writer_lock.acquire()
self._no_readers.clear()
async def release_read(self):
async with self._readers_lock:
self._readers -= 1
if self._readers == 0:
# Last reader allows writers
self._writer_lock.release()
self._no_readers.set()
async def acquire_write(self):
await self._writer_lock.acquire()
# Wait for existing readers to finish
await self._no_readers.wait()
async def release_write(self):
self._writer_lock.release()
New Usage of load_checkpoint()
you've given in example:
async def load_checkpoint() -> int:
global latest_checkpoint
await checkpoint_lock.acquire_write()
try:
# ... (file operations)
finally:
await checkpoint_lock.release_write() # Now works without error