pythonmultiprocessinggunicornfasttextdill

ERROR:root:can't pickle fasttext_pybind.fasttext objects


I am using gunicorn with multiple workers for my machine learning project. But the problem is when I send a train request only the worker getting the training request gets updated with the latest model after training is done. Here it is worth to mention that, to make the inference faster I have programmed to load the model once after each training. This is why, the only worker which is used for current training operation loads the latest model and the other workers still keeps the previously loaded model. Right now the model file (binary format) is loaded once after each training in a global dictionary variable where key is the model name and the value is the model file. Obviously, this problem won't occur if I program it to load the model every time from disk for each prediction, but I cannot do it, as it will make the prediction slower.

I studied further on global variables and further investigation shows that, in a multi-processing environment, all the workers (processes) create their own copies of global variables. Apart from the binary model file, I also have some other global variables (in dictionary type) need to be synced across all processes. So, how to handle this situation?

TL;DR: I need some approach which can help me to store variable which will be common across all the processes (workers). Any way to do this? With multiprocessing.Manager, dill etc.?

Update 1: I have multiple machine learning algorithms in my project and they have their own model files, which are being loaded to memory in a dictionary where the key is the model name and the value is the corresponding model object. I need to share all of them (in other words, I need to share the dictionary). But some of the models are not pickle serializable like - FastText. So, when I try to use a proxy variable (in my case a dictionary to hold models) with multiprocessing.Manager I get error for those non-pickle-serializable object while assigning the loaded model file to this dictionary. Like: can't pickle fasttext_pybind.fasttext objects. More information on multiprocessing.Manager can be found here: Proxy Objects

Following is the summary what I have done:

import multiprocessing
import fasttext

mgr = multiprocessing.Manager()
model_dict = mgr.dict()
model_file = fasttext.load_model("path/to/model/file/which/is/in/.bin/format")
model_dict["fasttext"] = model_file # This line throws this error

Error:

can't pickle fasttext_pybind.fasttext objects

I printed the model_file which I am trying to assign, it is:

<fasttext.FastText._FastText object at 0x7f86e2b682e8>

Update 2: According to this answer I modified my code a little bit:

import fasttext
from multiprocessing.managers import SyncManager

def Manager():
    m = SyncManager()
    m.start()
    return m

# As the model file has a type of "<fasttext.FastText._FastText object at 0x7f86e2b682e8>" so, using "fasttext.FastText._FastText" as the class of it
SyncManager.register("fast", fasttext.FastText._FastText)
# Now this is the Manager as a replacement of the old one.
mgr = Manager()
ft = mgr.fast() # This line gives error.

This gives me EOFError.

Update 3: I tried using dill both with multiprocessing and multiprocess. The summary of changes are as the following:

import multiprocessing
import multiprocess
import dill

# Any one of the following two lines
mgr = multiprocessing.Manager() # Or,
mgr = multiprocess.Manager()

model_dict = mgr.dict()
... ... ...
... ... ...

model_file = dill.dumps(model_file) # This line throws the error
model_dict["fasttext"] = model_file
... ... ...
... ... ...
# During loading
model_file = dill.loads(model_dict["fasttext"])

But still getting the error: can't pickle fasttext_pybind.fasttext objects.

Update 4: This time I am using another library called jsonpickle. It seems to be that serialization and de-serialization occurs properly (as it is not reporting any issue while running). But surprisingly enough, after de-serialization whenever I am making a prediction, it faces segmentation fault. More details and the steps to reproduce it can be found here: Segmentation fault (core dumped)

Update 5: Tried cloudpickle, srsly, but couldn't make the program working.


Solution

  • For the sake of completeness I am providing the solution that worked for me. All the approaches I have tried to serialize FastText went in vain. Finally, as @MedetTleukabiluly mentioned in the comment, I managed to share the message of loading the model from the disk with other workers with redis-pubsub. Obviously, it is not actually sharing the model from the same memory space, rather, just sharing the message to other workers to inform them they should load the model from the disk (as a new training just happened). Following is the general solution:

    # redis_pubsub.py
    
    import logging
    import os
    import fasttext
    import socket
    import threading
    import time
    
    """The whole purpose of GLOBAL_NAMESPACE is to keep the whole pubsub mechanism separate.
    As this might be a case another service also publishing in the same channel.
    """
    GLOBAL_NAMESPACE = "SERVICE_0"
    
    def get_ip():
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            # doesn't even have to be reachable
            s.connect(('10.255.255.255', 1))
            IP = s.getsockname()[0]
        except Exception:
            IP = '127.0.0.1'
        finally:
            s.close()
        return IP
    
    
    class RedisPubSub:
        def __init__(self):
            self.redis_client = get_redis_client() #TODO: A SAMPLE METHOD WHICH CAN RETURN YOUR REDIS CLIENT (you have to implement)
            # Unique ID is used, to identify which worker from which server is the publisher. Just to avoid updating
            # getting a message which message is indeed sent by itself.
            self.unique_id = "IP_" + get_ip() + "__" + str(GLOBAL_NAMESPACE) + "__" + "PID_" + str(os.getpid())
    
    
        def listen_to_channel_and_update_models(self, channel):
            try:
                pubsub = self.redis_client.pubsub()
                pubsub.subscribe(channel)
            except Exception as exception:
                logging.error(f"REDIS_ERROR: Model Update Listening: {exception}")
    
            while True:
                try:
                    message = pubsub.get_message()
    
                    # Successful operation gives 1 and unsuccessful gives 0
                    # ..we are not interested to receive these flags
                    if message and message["data"] != 1 and message["data"] != 0: 
                        message = message["data"].decode("utf-8")
                        message = str(message)
                        splitted_msg = message.split("__SEPERATOR__")
    
    
                        # Not only making sure the message is coming from another worker
                        # but also we have to make sure the message sender and receiver (i.e, both of the workers) are under the same namespace
                        if (splitted_msg[0] != self.unique_id) and (splitted_msg[0].split('__')[1] == GLOBAL_NAMESPACE):
                            algo_name = splitted_msg[1]
                            model_path = splitted_msg[2]
    
                            # Fasttext
                            if "fasttext" in algo_name:
                                try:
                                    #TODO: YOU WILL GET THE LOADED NEW FILE IN model_file. USE IT TO UPDATE THE OLD ONE.
                                    model_file = fasttext.load_model(model_path + '.bin')
                                except Exception as exception:
                                    logging.error(exception)
                                else:
                                    logging.info(f"{algo_name} model is updated for process with unique_id: {self.unique_id} by process with unique_id: {splitted_msg[0]}")
    
    
                    time.sleep(1) # sleeping for 1 second to avoid hammering the CPU too much
    
                except Exception as exception:
                    time.sleep(1)
                    logging.error(f"PUBSUB_ERROR: Model or component update: {exception}")
    
    
        def publish_to_channel(self, channel, algo_name, model_path):
            def _publish_to_channel():
                try:
                    message = self.unique_id + '__SEPERATOR__' + str(algo_name) + '__SEPERATOR__' + str(model_path)
                    time.sleep(3)
                    self.redis_client.publish(channel, message)
                except Exception as exception:
                    logging.error(f"PUBSUB_ERROR: Model or component publishing: {exception}")
    
            # As the delay before pubsub can pause the next activities which are independent, hence, doing this publishing in another thread.
            thread = threading.Thread(target = _publish_to_channel)
            thread.start()
    

    Also you have to start the listener:

    from redis_pubsub import RedisPubSub
    pubsub = RedisPubSub()
    
    
    # start the listener:
    thread = threading.Thread(target = pubsub.listen_to_channel_and_update_models, args = ("sync-ml-models", ))
    thread.start()
    

    From fasttext training module, when you finish the training, publish this message to other workers, such that the other workers get a chance to re-load the model from the disk:

    # fasttext_api.py
    
    from redis_pubsub import RedisPubSub
    pubsub = RedisPubSub()
    
    pubsub.publish_to_channel(channel = "sync-ml-models", # a sample name for the channel
                                      algo_name = f"fasttext",
                                      model_path = "path/to/fasttext/model")