I have this Python class and want your opinion on whether this cls._instance._cache = {}
is thread safe for tornado
? if not how can I handle this cache to be thread safe?
import logging
import aiohttp
import time
# Constants
DEFAULT_TIMEOUT = 20
MAX_ERRORS = 3
HTTP_READ_TIMEOUT = 1
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# TODO: check whether its tread safe with tornado event loop
cls._instance._cache = {}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._loop = None
return cls._instance
async def _fetch_update(self, url):
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
resp_data = await resp.json()
cached_at = time.time()
self._cache[url] = {
"cached_at": cached_at,
"config": resp_data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Error occurred while updating cache for {url}: {e}")
async def get(self, url):
if url not in self._cache or self._cache[url]["cached_at"] < time.time() - self._time_out:
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")
Python thread-safety is quite a "given" due to the infamous "global interpreter lock" (GIL), which prevents more than one thread executing Python code in parallel (but threading has huge gains in network related code such as in your example). Asyncio code, just as you are using, is naturally imune to race-conditions - and we can just ignore "threading" level concurrency: asyncio concurrency is handled in tasks. Although it is possible to have multiple threads, each one with a separate asyncio loop: in that case we'd have to protect the code towards both types of concurrency.
So, the part that matters, which is the cache (dict structure) update, happens atomically in Python, and there is no chance of having mixed or corrupted results.
There is just a window for a race condition there, which might, in the worst case scenario, request the same URL concurrently - there would be no program failure: whichever request would resolve last would be the "winner" and the data would be in the cache.
Adding a lock to this code however is not straightforward - a simple class-lock would prevent a new request, for a different URL of being started, and thus blocking the program. Aditionally, locks are either async
related or threading related - and if you really are running this with different Asyncio loops in concurrent threads (I prefer to think it is a single thread, and you just confused the terminology), one would have to use a careful combination of both a threading.Lock and an asyncio.Lock per thread, using either a threading.local()
or a contextvar
to keep track of them: that would require a lot of care and testing to be done correctly.
Instead, we can just add some other signal to indicate a URL is currently being fetched to prevent double fetching.
Also, as a side note, it is better to use time.monotonic()
than time.time()
for interval checks like the used here: this function will always return an increasing number of seconds since program start - while time.time()
may have hiccups at times as Daylight Saving Time transitions, or other O.S. updates to the system clock.
I've modified your code bellow to use a lock to prevent the same entry from being fetched in parallel, and the extra mentioned changes:
import asyncio
import logging
import aiohttp
import time
# Constants
DEFAULT_TIMEOUT = 20
MAX_ERRORS = 3
HTTP_READ_TIMEOUT = 1
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# TODO: check whether its tread safe with tornado event loop
cls._instance._cache = {}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._fetching_now = {}
cls._instance.lock = asyncio.Lock()
cls._instance._loop = None
return cls._instance
async def _fetch_update(self, url):
async with self.lock:
if event:=self._feching_now.get(url):
await event.wait()
if url in self._cache:
return
# url was being fetched, but it failed, so it is being retried now:
self._fetching_now[url] = asyncio.Event()
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
resp_data = await resp.json()
cached_at = time.monotonic()
self._cache[url] = {
"cached_at": cached_at,
"config": resp_data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Error occurred while updating cache for {url}: {e}")
finally:
self._fetching_now[url].set()
del self._fetching_now[url]
async def get(self, url):
if url not in self._cache or self._cache[url]["cached_at"] < time.monotonic() - self._time_out:
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")