I am using gunicorn
with multiple workers for my machine learning project. But the problem is when I send a train request only the worker getting the training request gets updated with the latest model after training is done. Here it is worth to mention that, to make the inference faster I have programmed to load the model once after each training. This is why, the only worker which is used for current training operation loads the latest model and the other workers still keeps the previously loaded model. Right now the model file (binary
format) is loaded once after each training in a global dictionary
variable where key
is the model name and the value is the model file. Obviously, this problem won't occur if I program it to load the model every time from disk for each prediction, but I cannot do it, as it will make the prediction slower.
I studied further on global variables and further investigation shows that, in a multi-processing environment, all the workers
(processes
) create their own copies of global
variables. Apart from the binary model file, I also have some other global
variables (in dictionary
type) need to be synced across all processes. So, how to handle this situation?
TL;DR: I need some approach which can help me to store variable which will be common across all the processes (workers). Any way to do this? With multiprocessing.Manager
, dill
etc.?
Update 1: I have multiple machine learning algorithms in my project and they have their own model files, which are being loaded to memory in a dictionary
where the key
is the model name and the value
is the corresponding model object. I need to share all of them (in other words, I need to share the dictionary
). But some of the models are not pickle
serializable like - FastText
. So, when I try to use a proxy variable (in my case a dictionary
to hold models) with multiprocessing.Manager
I get error for those non-pickle-serializable
object while assigning the loaded model file to this dictionary. Like: can't pickle fasttext_pybind.fasttext objects
. More information on multiprocessing.Manager
can be found here: Proxy Objects
Following is the summary what I have done:
import multiprocessing
import fasttext
mgr = multiprocessing.Manager()
model_dict = mgr.dict()
model_file = fasttext.load_model("path/to/model/file/which/is/in/.bin/format")
model_dict["fasttext"] = model_file # This line throws this error
Error:
can't pickle fasttext_pybind.fasttext objects
I printed the model_file
which I am trying to assign, it is:
<fasttext.FastText._FastText object at 0x7f86e2b682e8>
Update 2: According to this answer I modified my code a little bit:
import fasttext
from multiprocessing.managers import SyncManager
def Manager():
m = SyncManager()
m.start()
return m
# As the model file has a type of "<fasttext.FastText._FastText object at 0x7f86e2b682e8>" so, using "fasttext.FastText._FastText" as the class of it
SyncManager.register("fast", fasttext.FastText._FastText)
# Now this is the Manager as a replacement of the old one.
mgr = Manager()
ft = mgr.fast() # This line gives error.
This gives me EOFError
.
Update 3: I tried using dill
both with multiprocessing
and multiprocess
. The summary of changes are as the following:
import multiprocessing
import multiprocess
import dill
# Any one of the following two lines
mgr = multiprocessing.Manager() # Or,
mgr = multiprocess.Manager()
model_dict = mgr.dict()
... ... ...
... ... ...
model_file = dill.dumps(model_file) # This line throws the error
model_dict["fasttext"] = model_file
... ... ...
... ... ...
# During loading
model_file = dill.loads(model_dict["fasttext"])
But still getting the error: can't pickle fasttext_pybind.fasttext objects
.
Update 4:
This time I am using another library called jsonpickle. It seems to be that serialization and de-serialization occurs properly (as it is not reporting any issue while running). But surprisingly enough, after de-serialization whenever I am making a prediction, it faces segmentation fault
. More details and the steps to reproduce it can be found here: Segmentation fault (core dumped)
Update 5: Tried cloudpickle, srsly, but couldn't make the program working.
For the sake of completeness I am providing the solution that worked for me. All the approaches I have tried to serialize FastText
went in vain. Finally, as @MedetTleukabiluly mentioned in the comment, I managed to share the message of loading the model from the disk with other workers with redis-pubsub
. Obviously, it is not actually sharing the model from the same memory space, rather, just sharing the message to other workers to inform them they should load the model from the disk (as a new training just happened). Following is the general solution:
# redis_pubsub.py
import logging
import os
import fasttext
import socket
import threading
import time
"""The whole purpose of GLOBAL_NAMESPACE is to keep the whole pubsub mechanism separate.
As this might be a case another service also publishing in the same channel.
"""
GLOBAL_NAMESPACE = "SERVICE_0"
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
class RedisPubSub:
def __init__(self):
self.redis_client = get_redis_client() #TODO: A SAMPLE METHOD WHICH CAN RETURN YOUR REDIS CLIENT (you have to implement)
# Unique ID is used, to identify which worker from which server is the publisher. Just to avoid updating
# getting a message which message is indeed sent by itself.
self.unique_id = "IP_" + get_ip() + "__" + str(GLOBAL_NAMESPACE) + "__" + "PID_" + str(os.getpid())
def listen_to_channel_and_update_models(self, channel):
try:
pubsub = self.redis_client.pubsub()
pubsub.subscribe(channel)
except Exception as exception:
logging.error(f"REDIS_ERROR: Model Update Listening: {exception}")
while True:
try:
message = pubsub.get_message()
# Successful operation gives 1 and unsuccessful gives 0
# ..we are not interested to receive these flags
if message and message["data"] != 1 and message["data"] != 0:
message = message["data"].decode("utf-8")
message = str(message)
splitted_msg = message.split("__SEPERATOR__")
# Not only making sure the message is coming from another worker
# but also we have to make sure the message sender and receiver (i.e, both of the workers) are under the same namespace
if (splitted_msg[0] != self.unique_id) and (splitted_msg[0].split('__')[1] == GLOBAL_NAMESPACE):
algo_name = splitted_msg[1]
model_path = splitted_msg[2]
# Fasttext
if "fasttext" in algo_name:
try:
#TODO: YOU WILL GET THE LOADED NEW FILE IN model_file. USE IT TO UPDATE THE OLD ONE.
model_file = fasttext.load_model(model_path + '.bin')
except Exception as exception:
logging.error(exception)
else:
logging.info(f"{algo_name} model is updated for process with unique_id: {self.unique_id} by process with unique_id: {splitted_msg[0]}")
time.sleep(1) # sleeping for 1 second to avoid hammering the CPU too much
except Exception as exception:
time.sleep(1)
logging.error(f"PUBSUB_ERROR: Model or component update: {exception}")
def publish_to_channel(self, channel, algo_name, model_path):
def _publish_to_channel():
try:
message = self.unique_id + '__SEPERATOR__' + str(algo_name) + '__SEPERATOR__' + str(model_path)
time.sleep(3)
self.redis_client.publish(channel, message)
except Exception as exception:
logging.error(f"PUBSUB_ERROR: Model or component publishing: {exception}")
# As the delay before pubsub can pause the next activities which are independent, hence, doing this publishing in another thread.
thread = threading.Thread(target = _publish_to_channel)
thread.start()
Also you have to start the listener:
from redis_pubsub import RedisPubSub
pubsub = RedisPubSub()
# start the listener:
thread = threading.Thread(target = pubsub.listen_to_channel_and_update_models, args = ("sync-ml-models", ))
thread.start()
From fasttext
training module, when you finish the training, publish this message to other workers, such that the other workers get a chance to re-load the model from the disk:
# fasttext_api.py
from redis_pubsub import RedisPubSub
pubsub = RedisPubSub()
pubsub.publish_to_channel(channel = "sync-ml-models", # a sample name for the channel
algo_name = f"fasttext",
model_path = "path/to/fasttext/model")