I have following Code. It reads a pre-existing file for a ML model. I am trying to run it on databricks on multiple cases
import numpy as np
import joblib
class WeightedEnsembleRegressor:
"""
Holds models, their weights, scaler and feature order for prediction & persistence.
"""
import joblib
def __init__(self, trained_models, model_weights, scaler, feature_order):
self.trained_models = trained_models
self.model_weights = model_weights
self.scaler = scaler
self.feature_order = feature_order
def save(self, path):
joblib.dump(self, path)
@staticmethod
def load(path):
return joblib.load(path)
def Process(df):
import traceback
import xgboost
import pickle
import joblib
import sys
#print(sys.version)
data={}
data['msg']=[""]
try:
ensemble = WeightedEnsembleRegressor.load('final_model_v3_1.joblib')
data['msg'] = ["success" + f"""{sys.version} """+ f"""{joblib.__version__} """ + f"""{pickle.compatible_formats} """]
except Exception:
data['msg'] = ["fail"+ f"""{sys.version} """ + f"""{joblib.__version__} """ + f"""{pickle.compatible_formats} """+f"""{traceback.format_exc()}""" + f"""{xgboost.__version__}""" + f"""{catboost.__version__}"""]
return pd.DataFrame.from_dict(data,orient='index').transpose()
When I run this like below
display(Process(df))
It runs fine with following message
success3.11.10 (main, Sep 7 2024, 18:35:41) [GCC 11.4.0] 1.2.0 ['1.0', '1.1', '1.2', '1.3', '2.0', '3.0', '4.0', '5.0']
However, when I run like this
display(pDf.groupBy('num').applyInPandas(Process,'msg string'))
it fails with message
fail3.11.10 (main, Sep 7 2024, 18:35:41) [GCC 11.4.0] 1.2.0 ['1.0', '1.1', '1.2', '1.3', '2.0', '3.0', '4.0', '5.0'] Traceback (most recent call last):
File "/home/spark-bf403da4-5cb5-46b6-b647-9a/.ipykernel/41375/command-8846548913142126-766074700", line 13, in Process
File "/home/spark-bf403da4-5cb5-46b6-b647-9a/.ipykernel/41375/command-8846548913142125-2478242552", line 31, in load
File "/databricks/python/lib/python3.11/site-packages/joblib/numpy_pickle.py", line 658, in load
obj = _unpickle(fobj, filename, mmap_mode)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.11/site-packages/joblib/numpy_pickle.py", line 577, in _unpickle
obj = unpickler.load()
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/pickle.py", line 1213, in load
dispatch[key[0]](self)
File "/usr/lib/python3.11/pickle.py", line 1538, in load_stack_global
self.append(self.find_class(module, name))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/pickle.py", line 1582, in find_class
return _getattribute(sys.modules[module], name)[0]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/pickle.py", line 331, in _getattribute
raise AttributeError("Can't get attribute {!r} on {!r}"
AttributeError: Can't get attribute 'WeightedEnsembleRegressor' on <module '_engine_pyspark.databricks.workerwrap' from '/databricks/spark/python/_engine_pyspark.zip/_engine_pyspark/databricks/workerwrap.py'>
3.0.21.2.8
Any insight into how to fix this appreciated
The solution was to register as mlflow model
class WeightedEnsembleRegressorPyfunc(mlflow.pyfunc.PythonModel):
...
Then
python_model = WeightedEnsembleRegressor.load('final_model_v3_1.joblib')
mlflow.set_registry_uri("databricks")
with mlflow.start_run() as run:
mlflow.pyfunc.log_model(
artifact_path="ensemble_model",
python_model=python_model
)
run_id = run.info.run_id
Now you can use python_model in a function and use applyInPandas