pythonmultithreadingpicklefilelock

Load and dump to file in multithreading using Pickle and filelock - IOError: [Errno 13]


I have a service that load and dump data from a python dict into a file using python 2.7 cPickle. This service can be called simultaneously by many users.

What approach would allow cPickle to read and dump data into a single file in a multithreading context in order to avoid problems of desynchronization (loading while another process is dumping) of the data during operations?

I was thinking of using filelock but I havent been successful yet.

With my code below, the file is always have cPickle.load(cache_file)IOError: [Errno 13] Permission denied" error in init_cache() or update_cache()

''' example of a dict dumped by pickle

  { 
     "version": "1499180895", 
     "queries": { 
         "001::id,name,age" : "aBase64EncodedString==",
         "002::id,name,sex" : "anotherBase64EncodedString=="
      }
   }

'''


import cPickle as pickle
import filelock
from os import path

self.cache_file_path = "\\\\serverDisk\\cache\\cache.pkl"
self.select_by_values = "001"
self.out_fields = ["id", "name", "age"]

def get_from_cache_fn(self):
    try:
        server_version = self.query_version()
        query_id = "{}::{}".format(self.select_by_values, ",".join(self.out_fields))
        if path.isfile(self.cache_file_path):
            cache_dict = self.load_cache(server_version, query_id)
            if cache_dict["version"] == server_version:
                if query_id in cache_dict["queries"]:
                     return cache_dict["queries"][query_id]
                else:
                    return self.update_cache(cache_dict, query_id)["queries"][query_id]
            else:
                return self.init_cache(server_version, query_id)["queries"][query_id]
        else:
            return self.init_cache(server_version, query_id)["queries"][query_id]
    except Exception:
        self.add_service_error(ERRORS["get_from_cache"][0], traceback.format_exc())


def load_cache(self, server_version, query_id):
    with open(self.cache_file_path, "rb") as cache_file:
        try:
            cache_dict = pickle.load(cache_file)
            return cache_dict
        except StandardError:
            return self.init_cache(server_version, query_id)


def init_cache(self, server_version, query_id):
    cache_dict = {
        "version" : server_version,
        "queries" : {
            query_id : base64.b64encode(zlib.compress(json.dumps(self.query_features())))
        }
    }
    lock = filelock.FileLock(self.cache_file_path)
    try:
        with lock.acquire(timeout=10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("init_cache timeout", traceback.format_exc())


def update_cache(self, cache_dict, query_id):
    cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
    lock = filelock.FileLock(self.cache_file_path)
    try:
        with lock.acquire(timeout = 10):
            with open(self.cache_file_path, "wb") as cache_file:
                pickle.dump(cache_dict, cache_file)
                return cache_dict
    except lock.Timeout:
        self.add_service_error("update_cache timeout", traceback.format_exc())

Solution

  • I have found the solution to my problem.

    It appears that you have to give a different lock name than the file you are opening.

    lock = filelock.FileLock("{}.lock".format(self.cache_file_path)) instead of lock = filelock.FileLock(self.cache_file_path)

    for instance:

    def update_cache(self, cache_dict, query_id):
        cache_dict["queries"][query_id] = base64.b64encode(zlib.compress(json.dumps(self.query_features())))
        lock = lock = filelock.FileLock("{}.lock".format(self.cache_file_path))
        try:
            with lock.acquire(timeout = 10):
                with open(self.cache_file_path, "wb") as cache_file:
                    pickle.dump(cache_dict, cache_file)
                    return cache_dict
        except lock.Timeout:
            self.add_service_error("update_cache timeout", traceback.format_exc())