pythonpysparkdatabrickspickleuser-defined-functions

How do I load joblib file on spark?


I have following Code. It reads a pre-existing file for a ML model. I am trying to run it on databricks on multiple cases

import numpy as np
import joblib
class WeightedEnsembleRegressor:
    """
    Holds models, their weights, scaler and feature order for prediction & persistence.
    """
    import joblib
    def __init__(self, trained_models, model_weights, scaler, feature_order):
        self.trained_models = trained_models
        self.model_weights = model_weights
        self.scaler = scaler
        self.feature_order = feature_order

    def save(self, path):
        joblib.dump(self, path)

    @staticmethod
    def load(path):
        return joblib.load(path)

def Process(df):
    import traceback
    import xgboost
    import pickle
    import joblib
    import sys
    #print(sys.version)

    data={}
    data['msg']=[""]
    try:

        ensemble = WeightedEnsembleRegressor.load('final_model_v3_1.joblib')
        data['msg'] = ["success" + f"""{sys.version} """+ f"""{joblib.__version__} """ + f"""{pickle.compatible_formats} """]
    except Exception:

        data['msg'] = ["fail"+ f"""{sys.version} """ + f"""{joblib.__version__} """ + f"""{pickle.compatible_formats} """+f"""{traceback.format_exc()}""" + f"""{xgboost.__version__}""" + f"""{catboost.__version__}"""]

    return pd.DataFrame.from_dict(data,orient='index').transpose()

When I run this like below

display(Process(df))

It runs fine with following message

success3.11.10 (main, Sep  7 2024, 18:35:41) [GCC 11.4.0] 1.2.0 ['1.0', '1.1', '1.2', '1.3', '2.0', '3.0', '4.0', '5.0'] 

However, when I run like this

display(pDf.groupBy('num').applyInPandas(Process,'msg string'))

it fails with message

fail3.11.10 (main, Sep  7 2024, 18:35:41) [GCC 11.4.0] 1.2.0 ['1.0', '1.1', '1.2', '1.3', '2.0', '3.0', '4.0', '5.0'] Traceback (most recent call last):
  File "/home/spark-bf403da4-5cb5-46b6-b647-9a/.ipykernel/41375/command-8846548913142126-766074700", line 13, in Process
  File "/home/spark-bf403da4-5cb5-46b6-b647-9a/.ipykernel/41375/command-8846548913142125-2478242552", line 31, in load
  File "/databricks/python/lib/python3.11/site-packages/joblib/numpy_pickle.py", line 658, in load
    obj = _unpickle(fobj, filename, mmap_mode)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/joblib/numpy_pickle.py", line 577, in _unpickle
    obj = unpickler.load()
          ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/pickle.py", line 1213, in load
    dispatch[key[0]](self)
  File "/usr/lib/python3.11/pickle.py", line 1538, in load_stack_global
    self.append(self.find_class(module, name))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/pickle.py", line 1582, in find_class
    return _getattribute(sys.modules[module], name)[0]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/pickle.py", line 331, in _getattribute
    raise AttributeError("Can't get attribute {!r} on {!r}"
AttributeError: Can't get attribute 'WeightedEnsembleRegressor' on <module '_engine_pyspark.databricks.workerwrap' from '/databricks/spark/python/_engine_pyspark.zip/_engine_pyspark/databricks/workerwrap.py'>
3.0.21.2.8

Any insight into how to fix this appreciated


Solution

  • The solution was to register as mlflow model

    
    class WeightedEnsembleRegressorPyfunc(mlflow.pyfunc.PythonModel):
    ...
    

    Then

    python_model  = WeightedEnsembleRegressor.load('final_model_v3_1.joblib')
    mlflow.set_registry_uri("databricks")
    
    with mlflow.start_run() as run:
        mlflow.pyfunc.log_model(
            artifact_path="ensemble_model",
            python_model=python_model
        )
        run_id = run.info.run_id
    

    Now you can use python_model in a function and use applyInPandas