savedatabrickspipelinemlflowals

How to log an ALS model within a mlflow run?


I currently work on a databricks cluster trying to log an ALS model within a mlflow run. Trying multiple different approaches I either get a TypeError "cannot pickle '_thread.RLock' object" stopping my run or an OSError "No such file or directory: '/tmp/tmpxiznhskj/sparkml'" not stopping my run but I am not able to load that model back into my code.

Here is the prep code to play around a little bit:

import mlflow
import logging
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkContext, SparkConf

data = [{"User": 1, "Item": 1, "Rating": 1},
        {"User": 2, "Item": 2, "Rating": 3},
        {"User": 3, "Item": 3, "Rating": 1},
        {"User": 4, "Item": 2, "Rating": 4},
        {"User": 1, "Item": 2, "Rating": 3},
        {"User": 2, "Item": 3, "Rating": 2},
        {"User": 2, "Item": 4, "Rating": 1},
        {"User": 4, "Item": 1, "Rating": 5}
        ]

conf = SparkConf().setAppName("ALS-mlflow-test")
sc = SparkContext.getOrCreate(conf)
rdd = sc.parallelize(data)
df_rating = rdd.toDF()

(df_train, df_test) = df_rating.randomSplit([0.8, 0.2])

logging.getLogger("mlflow").setLevel(logging.DEBUG)
  1. Using this as a base I tried different approaches, starting with the mlflow.sklearn.log_model method:
with mlflow.start_run() as run:    
    model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
          coldStartStrategy="drop")
    
    model_als.fit(df_train)
    mlflow.sklearn.log_model(model_als, artifact_path="test")

which results in the following error:

_SklearnCustomModelPicklingError: Pickling custom sklearn model ALS failed when saving model: cannot pickle '_thread.RLock' object

  1. Next I tried using a WrapperModel for my ALS model (the Wrapper does not actually do something yet):
class MyModel(mlflow.pyfunc.PythonModel):
    def __init__(self, model):
        self.model = model
    
    def predict(self, context, model_input):
        return self.my_custom_function(model_input)

    def my_custom_function(self, model_input):
        return 0

with mlflow.start_run():
    model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
          coldStartStrategy="drop")
    my_model = MyModel(model_als)
    model_info = mlflow.pyfunc.log_model(artifact_path="model", python_model=my_model)

resulting in a more general but basically the same error as in step 1:

TypeError: cannot pickle '_thread.RLock' object

  1. I then tried putting my model within a pipeline (again nothing fancy just plain and simple for a test):
from pyspark.ml import Pipeline

with mlflow.start_run() as run:    
    model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
          coldStartStrategy="drop")
    
    pipeline = Pipeline(stages=[model_als])
    pipeline_model = pipeline.fit(df_train)
    mlflow.spark.log_model(pipeline_model, artifact_path="test-pipeline")

This time the code executed but having a look at the debug log there also went something wrong:

stderr: Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023/01/05 08:54:22 INFO mlflow.spark: File '/tmp/tmpxiznhskj/sparkml' not found on DFS. Will attempt to upload the file.
Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 162, in <module>
    main()
  File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 137, in main
    mlflow.pyfunc.load_model(model_path)
  File "/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 484, in load_model
    model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
  File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 134, in _load_pyfunc_patch
    return original(*args, **kwargs)
  File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 832, in _load_pyfunc
    return _PyFuncModelWrapper(spark, _load_model(model_uri=path))
  File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 727, in _load_model
    model_uri = _HadoopFileSystem.maybe_copy_from_uri(model_uri, dfs_tmpdir)
  File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 404, in maybe_copy_from_uri
    return cls.maybe_copy_from_local_file(_download_artifact_from_uri(src_uri), dst_path)
  File "/databricks/python/lib/python3.9/site-packages/mlflow/tracking/artifact_utils.py", line 100, in _download_artifact_from_uri
    return get_artifact_repository(artifact_uri=root_uri).download_artifacts(
  File "/databricks/python/lib/python3.9/site-packages/mlflow/store/artifact/local_artifact_repo.py", line 79, in download_artifacts
    raise IOError("No such file or directory: '{}'".format(local_artifact_path))
OSError: No such file or directory: '/tmp/tmpxiznhskj/sparkml'
  1. Nevertheless, I tried loading the model, since the code ran through:
from pyspark.ml import PipelineModel

logged_model = 'runs:/xyz123/test'

# Load model
loaded_model = PipelineModel.load(logged_model)

results in the error:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "runs"

  1. I then tried the auto generated code from databricks:
import mlflow
logged_model = 'runs:/xyz123/test'

# Load model
loaded_model = mlflow.spark.load_model(logged_model)

# Perform inference via model.transform()
loaded_model.transform(data)

results in the following error:

AttributeError: 'list' object has no attribute '_jdf'

  1. Last piece of code would be (but was not able to get to it yet, since I can't load the model):
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")

df_pred = loaded_model.transform(df_test)
rmse = evaluator.evaluate(df_pred)

df_pred.display()

print("Root-mean-square error explicit = " + str(rmse))

user_recs = loaded_model.recommendForAllUsers(2)
user_recs.display()

I conclusion what I am trying to achieve is simply logging the provided ALS model within my mlflow run. I ran out of ideas what could be wrong or what else I could try

Thanks in advance!


Solution

  • As of MLFlow 2.1.1 pyspark.ml.recommendation.ALS is not on the allowlist. Using the pipeline as you tested in #3 is the appropriate way to log unsupported spark.ml models.

    It looks like you might have hit an environment issue when logging your model, as I was able to get the following implementation working both on Databricks and locally.

    Log within run

    import mlflow
    from mlflow.models.signature import infer_signature
    from pyspark.ml import Pipeline
    from pyspark.ml.recommendation import ALS
    
    # get a bigger test split from your data
    (df_train, df_test) = df_rating.randomSplit([0.6, 0.4])
    
    with mlflow.start_run() as run:
        # initialize als model
        als = ALS(
            maxIter=5,
            regParam=0.01,
            userCol="User",
            itemCol="Item",
            ratingCol="Rating",
            implicitPrefs=False,
            coldStartStrategy="drop",
        )
    
        # build and fit pipeline
        pipeline = Pipeline(stages=[als])
        pipeline_model = pipeline.fit(df_train)
    
        # test predict and infer signature
        predictions = pipeline_model.transform(df_test)
        signature = infer_signature(df_train, predictions)
    
        # log model
        mlflow.spark.log_model(
            pipeline_model, artifact_path="spark-model", signature=signature
        )
    
        mlflow.end_run()
    

    Reload for inference

    On Databricks you need to first move the model from the experiment into the model registry. This can be done via the UI or with the following commands.

    # construct the model_uri from generated run_id and the set artifact_path
    run_id = "2f9a5424b1f44435a9413a3e2762b8a9"
    artifact_path = "spark-model"
    model_uri = f"runs:/{run_id}/{artifact_path}"
    
    # move the model into the registry
    model_details = mlflow.register_model(model_uri=model_uri, name=model_name)
    
    # load model
    version = 1
    model_uri = f"models:/{model_name}/{version}"
    loaded_model = mlflow.spark.load_model(model_uri)
    
    # test predict
    loaded_model.transform(df_test).show()
    
    +----+------+----+----------+
    |Item|Rating|User|prediction|
    +----+------+----+----------+
    |   2|     3|   2| 1.0075595|
    +----+------+----+----------+
    

    Log directly to registry

    Alternatively, you can also log directly to the model registry.

    # log model
    model_name="als-model"
    mlflow.spark.log_model(
        pipeline_model, 
        artifact_path="spark-model",
        registered_model_name=model_name
    )
    
    # log model
    version = 1
    model_uri = f"models:/{model_name}/{version}"
    loaded_model = mlflow.spark.load_model(model_uri)
    
    # test predict
    loaded_model.transform(df_test).show()
    

    Logging as custom pyfunc

    I also tried wrapping the ALS model and the pipeline in a custom pyfunc and in both cases I received the same exact error. I believe there is something unserializable with the ALS model that prevents this...