After my previous thread was marked as a duplicate, it pointed me in the direction of multiprocessing managers. I'm trying to use multiprocessing to create a service that handles my pandas dataframe to give to Flask requests. This is my code so far:
df_manager.py
from multiprocessing.managers import BaseManager
import pandas as pd
def init_dataframe():
return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
def get_df():
return df
df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', get_df)
server = manager.get_server()
server.serve_forever()
data_handler.py
from multiprocessing.managers import BaseManager
import pandas as pd
def get_df():
manager = BaseManager(('', 37844), b'password')
manager.register('get_df')
manager.connect()
return manager.get_df()
def data():
df = get_df()
return df.to_dict()
if __name__ == '__main__':
data()
Unfortunately, this throws an exception when attempting to call manager.get_df()
in data_handler.py
.
Traceback (most recent call last):
File "src/data_handler.py", line 15, in <module>
data()
File "src/data_handler.py", line 11, in data
df = get_df()
File "src/data_handler.py", line 8, in get_df
return manager.get_df()
File "/usr/lib/python3.7/multiprocessing/managers.py", line 724, in temp
token, exp = self._create(typeid, *args, **kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 609, in _create
id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 82, in dispatch
raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError:
---------------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/managers.py", line 201, in handle_request
result = func(c, *args, **kwds)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 391, in create
exposed = public_methods(obj)
File "/usr/lib/python3.7/multiprocessing/managers.py", line 122, in public_methods
return [name for name in all_methods(obj) if name[0] != '_']
File "/usr/lib/python3.7/multiprocessing/managers.py", line 113, in all_methods
func = getattr(obj, name)
File "/home/admin/dev/pandas-multiprocessing/venv/lib/python3.7/site-packages/pandas/core/frame.py", line 392, in _constructor_expanddim
raise NotImplementedError("Not supported for DataFrames!")
NotImplementedError: Not supported for DataFrames!
---------------------------------------------------------------------------
Any help in the right direction would be much appreciated!
EDIT: This seems to be caused by DataFrames specifically, as returning df.to_json()
instead of just df
in df_manager.py
seems to work fine. Still investigating...
EDIT2: I have updated the code to remove the Flask dependency, as it has seemingly nothing to do with it.
This issue is fixed by exposing
the relevant methods to the proxy used by BaseManager
. This can be done in the register
call in data_handler.py
.
df_manager.py
from multiprocessing.managers import BaseManager
import pandas as pd
def init_dataframe():
return pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
def get_df():
return df
df = init_dataframe()
manager = BaseManager(('', 37844), b'password')
manager.register('get_df', callable=get_df, exposed='get_df') # Adding `exposed` parameter was the key to solving the issue
server = manager.get_server()
server.serve_forever()
data_handler.py
from multiprocessing.managers import BaseManager
import pandas as pd
def get_df():
manager = BaseManager(('', 37844), b'password')
manager.register('get_df')
manager.connect()
return manager.get_df()
def data():
df = get_df()
return df
if __name__ == '__main__':
print(data())