I have a service that load and dump data from a python dict
into a file using python 2.7 cPickle
. This service can be called simultaneously by many users.
What approach would allow cPickle
to read and dump data into a single file in a multithreading context in order to avoid problems of desynchronization (loading while another process is dumping) of the data during operations?
I was thinking of using filelock but I havent been successful yet.
With my code below, the file is always have cPickle.load(cache_file)IOError: [Errno 13] Permission denied"
error in init_cache()
or update_cache()
''' example of a dict dumped by pickle
{
"version": "1499180895",
"queries": {
"001::id,name,age" : "aBase64EncodedString==",
"002::id,name,sex" : "anotherBase64EncodedString=="
}
}
'''
import cPickle as pickle
import filelock
from os import path
self.cache_file_path = "\\\\serverDisk\\cache\\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]
def get_from_cache_fn(self):
try:
server_version = self.query_version()
query_id = "{}::{}".format(self.select_by_values, ",".join(self.out_fields))
if path.isfile(self.cache_file_path):
cache_dict = self.load_cache(server_version, query_id)
if cache_dict["version"] == server_version:
if query_id in cache_dict["queries"]:
return cache_dict["queries"][query_id]
else:
return self.update_cache(cache_dict, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
else:
return self.init_cache(server_version, query_id)["queries"][query_id]
except Exception:
self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())
def load_cache(self, server_version, query_id):
with open(self.cache_file_path, "rb") as cache_file:
try:
cache_dict = pickle.load(cache_file)
return cache_dict
except StandardError:
return self.init_cache(server_version, query_id)
def init_cache(self, server_version, query_id):
cache_dict = {
"version" : server_version,
"queries" : {
query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
}
}
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout=10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("init_cache timeout", traceback.format_exc())
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = filelock.FileLock(self.cache_file_path)
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())
I have found the solution to my problem.
It appears that you have to give a different lock name than the file you are opening.
lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
instead of lock = filelock.FileLock(self.cache_file_path)
for instance:
def update_cache(self, cache_dict, query_id):
cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
lock = lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
try:
with lock.acquire(timeout = 10):
with open(self.cache_file_path, "wb") as cache_file:
pickle.dump(cache_dict, cache_file)
return cache_dict
except lock.Timeout:
self.add_service_error("update_cache timeout", traceback.format_exc())