I tried to implement a simple read-preferred read-write lock using 2 mutexes (using redis.lock.Lock), like what is described in this link (https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock)
In the [End Read] steps, I encountered this problem:
If b = 0, unlock g #(a write lock).
As, this READ process is not the one that acquired the lock, so the system throws an error. I think it has some token stored somewhere, and I can get it to use for the lock release, but I am not sure.
Could someone give me a hint? Thanks.
from enum import Enum
from redis import StrictRedis, lock
# data in Redis cache:
# "read_counter_lock_name" : 0
# "read_lock_lock_name" -> read lock, protect "read_counter_lock_name"
# "write_lock_lock_name" -> write lock, protect write data section
class Prefix(Enum):
READ = 'read_lock_'
WRITE = 'write_lock_'
READ_COUNTER = 'read_counter_'
class RedisLockParams(Enum):
TIMEOUT = 60 # maximum life for the lock in seconds = 60 seconds
SLEEP_TIME = 0.1 # the amount of time to sleep in seconds per loop iteration
# in redis lock's acquire() - sleep then retry
BLOCKING = True # acquire() should block until the lock has been acquired
BLOCKING_TIMEOUT = None # maximum amount of time in seconds to spend trying
# to acquire the lock
class ReadWriteLock:
def __init__(self, lock_name: str, redis_host: str, redis_port: int, redis_key: str):
self.__read_lock_name = Prefix.READ.value + lock_name
self.__write_lock_name = Prefix.WRITE.value + lock_name
self.__read_counter_key = Prefix.READ_COUNTER.value + lock_name
# self.__redis_host = redis_host
# self.__redis_port = redis_port
# self.__redis_key = redis_key
self.__cache = StrictRedis(host = redis_host,
port = redis_port,
db=0, # up to 16 logical database
password = redis_key,
ssl=True)
print(f'ping return:{self.__cache.ping()}')
# set the read counter to 0, if it does not exist.
self.__cache.setnx(self.__read_counter_key, 0)
# init the read lock
self.__read_lock = lock.Lock(self.__cache,
self.__read_lock_name,
RedisLockParams.TIMEOUT.value,
RedisLockParams.SLEEP_TIME.value,
RedisLockParams.BLOCKING.value,
RedisLockParams.BLOCKING_TIMEOUT.value)
# init the write lock
self.__write_lock = lock.Lock(self.__cache,
self.__write_lock_name,
RedisLockParams.TIMEOUT.value,
RedisLockParams.SLEEP_TIME.value,
RedisLockParams.BLOCKING.value,
RedisLockParams.BLOCKING_TIMEOUT.value)
def acquire_read_lock(self) -> bool:
write_lock_acquired = False
self.__read_lock.acquire()
read_counter = self.__cache.incr(self.__read_counter_key)
if (read_counter == 1):
write_lock_acquired = self.__write_lock.acquire() # acquire write lock
self.__read_lock.release()
return write_lock_acquired
def release_read_lock(self):
read_lock_acquired = self.__read_lock.acquire()
read_counter = self.__cache.decr(self.__read_counter_key)
if read_counter == 0 and read_lock_acquired:
self.__write_lock.release() # release the write lock-> issue!!!
self.__read_lock.release()
def acquire_write_lock(self) -> bool:
return self.__write_lock.acquire()
def release_write_lock(self):
self.__write_lock.release()
I have figured out how to release the not-owned redis lock by taking a look at the redis' python library source code. Below is the modified version of the multiread-single-write lock class.
# read_write_lock.py
from enum import Enum
from redis import StrictRedis, lock
# data in Redis cache:
# "read_counter_lock_name" : 0
# "read_lock_lock_name" -> read lock, protect "read_counter_lock_name"
# "write_lock_lock_name" -> write lock, protect write data section
class Prefix(Enum):
READ = 'read_lock_'
WRITE = 'write_lock_'
READ_COUNTER = 'read_counter_'
class RedisLockParams(Enum):
TIMEOUT = 60 # maximum life for the lock in seconds = 60 seconds
SLEEP_TIME = 0.1 # the amount of time to sleep in seconds per loop iteration
# in redis lock's acquire() - sleep then retry
BLOCKING = True # acquire() should block until the lock has been acquired
BLOCKING_TIMEOUT = None # maximum amount of time in seconds to spend trying
# to acquire the lock
class ReadWriteLock:
def __init__(self, lock_name: str, redis_host: str, redis_port: int, redis_key: str):
self.__read_lock_name = Prefix.READ.value + lock_name
self.__write_lock_name = Prefix.WRITE.value + lock_name
self.__read_counter_key = Prefix.READ_COUNTER.value + lock_name
self.__cache = StrictRedis(host = redis_host,
port = redis_port,
db=0, # up to 16 logical database
password = redis_key,
ssl=True)
# set the read counter to 0, if it does not exist.
self.__cache.setnx(self.__read_counter_key, 0)
# init the read lock
self.__read_lock = lock.Lock(self.__cache,
self.__read_lock_name,
RedisLockParams.TIMEOUT.value,
RedisLockParams.SLEEP_TIME.value,
RedisLockParams.BLOCKING.value,
RedisLockParams.BLOCKING_TIMEOUT.value)
# init the write lock
self.__write_lock = lock.Lock(self.__cache,
self.__write_lock_name,
RedisLockParams.TIMEOUT.value,
RedisLockParams.SLEEP_TIME.value,
RedisLockParams.BLOCKING.value,
RedisLockParams.BLOCKING_TIMEOUT.value)
def acquire_read_lock(self) -> bool:
write_lock_acquired = False
self.__read_lock.acquire()
read_counter = self.__cache.incr(self.__read_counter_key)
if (read_counter == 1):
write_lock_acquired = self.__write_lock.acquire() # acquire write lock
self.__read_lock.release()
return write_lock_acquired
def release_read_lock(self):
read_lock_acquired = self.__read_lock.acquire()
if read_lock_acquired:
read_counter = self.__cache.decr(self.__read_counter_key)
if read_counter == 0:
if self.__write_lock.owned():
self.__write_lock.release()
else: # if the lock was not owned, just take its token and override
write_lock_token = self.__cache.get(self.__write_lock_name)
self.__write_lock.local.token = write_lock_token
self.__write_lock.release()
self.__read_lock.release()
def acquire_write_lock(self) -> bool:
return self.__write_lock.acquire()
def release_write_lock(self) -> None:
self.__write_lock.release()