I currently work on a databricks cluster trying to log an ALS model within a mlflow run. Trying multiple different approaches I either get a TypeError "cannot pickle '_thread.RLock' object" stopping my run or an OSError "No such file or directory: '/tmp/tmpxiznhskj/sparkml'" not stopping my run but I am not able to load that model back into my code.
Here is the prep code to play around a little bit:
import mlflow
import logging
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkContext, SparkConf
data = [{"User": 1, "Item": 1, "Rating": 1},
{"User": 2, "Item": 2, "Rating": 3},
{"User": 3, "Item": 3, "Rating": 1},
{"User": 4, "Item": 2, "Rating": 4},
{"User": 1, "Item": 2, "Rating": 3},
{"User": 2, "Item": 3, "Rating": 2},
{"User": 2, "Item": 4, "Rating": 1},
{"User": 4, "Item": 1, "Rating": 5}
]
conf = SparkConf().setAppName("ALS-mlflow-test")
sc = SparkContext.getOrCreate(conf)
rdd = sc.parallelize(data)
df_rating = rdd.toDF()
(df_train, df_test) = df_rating.randomSplit([0.8, 0.2])
logging.getLogger("mlflow").setLevel(logging.DEBUG)
with mlflow.start_run() as run:
model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
coldStartStrategy="drop")
model_als.fit(df_train)
mlflow.sklearn.log_model(model_als, artifact_path="test")
which results in the following error:
_SklearnCustomModelPicklingError: Pickling custom sklearn model ALS failed when saving model: cannot pickle '_thread.RLock' object
class MyModel(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.my_custom_function(model_input)
def my_custom_function(self, model_input):
return 0
with mlflow.start_run():
model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
coldStartStrategy="drop")
my_model = MyModel(model_als)
model_info = mlflow.pyfunc.log_model(artifact_path="model", python_model=my_model)
resulting in a more general but basically the same error as in step 1:
TypeError: cannot pickle '_thread.RLock' object
from pyspark.ml import Pipeline
with mlflow.start_run() as run:
model_als = ALS(maxIter=5, regParam=0.01, userCol="User", itemCol="Item", ratingCol="Rating", implicitPrefs=False,
coldStartStrategy="drop")
pipeline = Pipeline(stages=[model_als])
pipeline_model = pipeline.fit(df_train)
mlflow.spark.log_model(pipeline_model, artifact_path="test-pipeline")
This time the code executed but having a look at the debug log there also went something wrong:
stderr: Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023/01/05 08:54:22 INFO mlflow.spark: File '/tmp/tmpxiznhskj/sparkml' not found on DFS. Will attempt to upload the file.
Traceback (most recent call last):
File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 162, in <module>
main()
File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 137, in main
mlflow.pyfunc.load_model(model_path)
File "/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 484, in load_model
model_impl = importlib.import_module(conf[MAIN])._load_pyfunc(data_path)
File "/databricks/python/lib/python3.9/site-packages/mlflow/utils/_capture_modules.py", line 134, in _load_pyfunc_patch
return original(*args, **kwargs)
File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 832, in _load_pyfunc
return _PyFuncModelWrapper(spark, _load_model(model_uri=path))
File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 727, in _load_model
model_uri = _HadoopFileSystem.maybe_copy_from_uri(model_uri, dfs_tmpdir)
File "/databricks/python/lib/python3.9/site-packages/mlflow/spark.py", line 404, in maybe_copy_from_uri
return cls.maybe_copy_from_local_file(_download_artifact_from_uri(src_uri), dst_path)
File "/databricks/python/lib/python3.9/site-packages/mlflow/tracking/artifact_utils.py", line 100, in _download_artifact_from_uri
return get_artifact_repository(artifact_uri=root_uri).download_artifacts(
File "/databricks/python/lib/python3.9/site-packages/mlflow/store/artifact/local_artifact_repo.py", line 79, in download_artifacts
raise IOError("No such file or directory: '{}'".format(local_artifact_path))
OSError: No such file or directory: '/tmp/tmpxiznhskj/sparkml'
from pyspark.ml import PipelineModel
logged_model = 'runs:/xyz123/test'
# Load model
loaded_model = PipelineModel.load(logged_model)
results in the error:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "runs"
import mlflow
logged_model = 'runs:/xyz123/test'
# Load model
loaded_model = mlflow.spark.load_model(logged_model)
# Perform inference via model.transform()
loaded_model.transform(data)
results in the following error:
AttributeError: 'list' object has no attribute '_jdf'
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
df_pred = loaded_model.transform(df_test)
rmse = evaluator.evaluate(df_pred)
df_pred.display()
print("Root-mean-square error explicit = " + str(rmse))
user_recs = loaded_model.recommendForAllUsers(2)
user_recs.display()
I conclusion what I am trying to achieve is simply logging the provided ALS model within my mlflow run. I ran out of ideas what could be wrong or what else I could try
Thanks in advance!
As of MLFlow 2.1.1 pyspark.ml.recommendation.ALS
is not on the allowlist. Using the pipeline as you tested in #3 is the appropriate way to log unsupported spark.ml models.
It looks like you might have hit an environment issue when logging your model, as I was able to get the following implementation working both on Databricks and locally.
import mlflow
from mlflow.models.signature import infer_signature
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
# get a bigger test split from your data
(df_train, df_test) = df_rating.randomSplit([0.6, 0.4])
with mlflow.start_run() as run:
# initialize als model
als = ALS(
maxIter=5,
regParam=0.01,
userCol="User",
itemCol="Item",
ratingCol="Rating",
implicitPrefs=False,
coldStartStrategy="drop",
)
# build and fit pipeline
pipeline = Pipeline(stages=[als])
pipeline_model = pipeline.fit(df_train)
# test predict and infer signature
predictions = pipeline_model.transform(df_test)
signature = infer_signature(df_train, predictions)
# log model
mlflow.spark.log_model(
pipeline_model, artifact_path="spark-model", signature=signature
)
mlflow.end_run()
Reload for inference
On Databricks you need to first move the model from the experiment into the model registry. This can be done via the UI or with the following commands.
# construct the model_uri from generated run_id and the set artifact_path
run_id = "2f9a5424b1f44435a9413a3e2762b8a9"
artifact_path = "spark-model"
model_uri = f"runs:/{run_id}/{artifact_path}"
# move the model into the registry
model_details = mlflow.register_model(model_uri=model_uri, name=model_name)
# load model
version = 1
model_uri = f"models:/{model_name}/{version}"
loaded_model = mlflow.spark.load_model(model_uri)
# test predict
loaded_model.transform(df_test).show()
+----+------+----+----------+
|Item|Rating|User|prediction|
+----+------+----+----------+
| 2| 3| 2| 1.0075595|
+----+------+----+----------+
Alternatively, you can also log directly to the model registry.
# log model
model_name="als-model"
mlflow.spark.log_model(
pipeline_model,
artifact_path="spark-model",
registered_model_name=model_name
)
# log model
version = 1
model_uri = f"models:/{model_name}/{version}"
loaded_model = mlflow.spark.load_model(model_uri)
# test predict
loaded_model.transform(df_test).show()
I also tried wrapping the ALS model and the pipeline in a custom pyfunc and in both cases I received the same exact error. I believe there is something unserializable with the ALS model that prevents this...