I would like to publish a pyspark.ml.PipelineModel
object in a Model Integration code repository. I tried to follow the Supervised Learning Tutorial in the documentation, but that tutorial is for an sklearn
model and works with pandas
dataframes. I would like to update the approach to work with spark
dataframes as input and produce PipelineModel
objects to be used elsewhere in Foundry as the output.
The pyspark
model logic I've used works -- I've run it successfully in Code Workbooks -- so I suspect the issue I'm having is with the adapter.py
Model Adapter file.
I created a toy dataset in a Code Workbook transform, built it, and referenced it within the model repository.
def toy_data():
import pandas as pd
pandas_df = pd.DataFrame({
"x1":["a", "a", "a", "a", "a", "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "b", "b", "b", "b", "b"],
"x2":["z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v"],
"y":[1, 3, 2, 3, 2, 1, 2, 3, 4, 3, 11, 17, 13, 12, 17, 22, 21, 7, 14, 10]
})
spark_df = spark.createDataFrame(pandas_df)
return spark_df
Here is the logic in the model_training.py
file.
from transforms.api import transform, Input
from palantir_models.transforms import ModelOutput
from main.model_adapters.adapter import ExampleModelAdapter
@transform(
training_data_input=Input("INPUT_PATH"),
model_output=ModelOutput("MODEL_OUTPUT_PATH"),
)
def compute(training_data_input, model_output):
'''
This function has only Foundry specific functionality.
It extracts the training data, calls train_model to train a model, and saves the trained model to Foundry.
'''
training_df = training_data_input.dataframe() # Load a Spark dataframe from the TransformsInput
model = train_model(training_df) # Train the model
# Wrap the trained model in a ModelAdapter
foundry_model = ExampleModelAdapter(model) # Edit ExampleModelAdapter for your model
# Publish and write the trained model to Foundry
model_output.publish(
model_adapter=foundry_model
)
def train_model(training_df):
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
train_set = training_df
cat_vars = ['x1', 'x2']
# Pipeline components
indexer = StringIndexer(inputCols=cat_vars,
outputCols=[var + "_indexed" for var in cat_vars],
handleInvalid='keep')
assembler = VectorAssembler(inputCols=[var + "_indexed" for var in cat_vars], outputCol='features')
decision_tree = DecisionTreeRegressor(featuresCol='features', labelCol='y', seed=123)
# regression evaluator for model performance
regression_evaluator = RegressionEvaluator(labelCol='y')
# Set pipeline
pipeline=Pipeline(stages=[
indexer,
assembler,
decision_tree])
# Define hyperparameter grid
params = (
ParamGridBuilder()
.addGrid(param=decision_tree.maxDepth, values=[1, 2])
.addGrid(param=decision_tree.minInstancesPerNode, values=[1, 2, 3])
.build()
)
# Define cross validator and extract best model
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=regression_evaluator, numFolds=2, seed=477)
cv_trees = cv.fit(train_set)
best_model = cv_trees.bestModel
return best_model
Here's the logic for adapter.py
. Note that I do intend the model input to be a Spark Dataframe, and the model output to be a Spark Dataframe that has been transformed by the PipelineModel
object.
import palantir_models as pm
from palantir_models_serializers import DillSerializer
class ExampleModelAdapter(pm.ModelAdapter):
@pm.auto_serialize(
# TODO: Fill in the model constructor and define parameter serialization.
model=DillSerializer()
)
def __init__(self, model):
self.model = model
@classmethod
def api(cls):
# TODO: Edit this method to define the model API.
inputs = {
"df_in": pm.Spark()
}
outputs = {
"df_out": pm.Spark()
}
return inputs, outputs
def predict(self, df_in):
# Input signature should match inputs defined in api()
# Return type should match the output type defined in api()
# TODO: Apply custom inference logic
df_in = self.model.transform(df_in)
return df_in
Here is the resulting error when I try to build the model.
[module version: 1.1008.0]
TypeError: cannot pickle 'socket' object
Traceback (most recent call last):
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/site-packages/transforms/_build.py", line 393, in run
self._transform.compute(ctx=transform_context, **parameters)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/site-packages/transforms/api/_transform.py", line 304, in compute
self(**kwargs)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/site-packages/transforms/api/_transform.py", line 217, in __call__
return self._compute_func(*args, **kwargs)
File "/app/work-dir/__user_code_environment__/__SYMLINKS__/site-packages/main/model_training/model_training.py", line 32, in compute
model_output.publish(
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/palantir_models/models/_writable_model.py", line 111, in publish
model_adapter.save(state_writer)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/palantir_models/models/_model_adapter.py", line 136, in save
serializers[arg_name].serialize(SubdirectoryModelStateWriter(arg_name, state_writer), arg_value)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/palantir_models_serializers/serializers/dill.py", line 19, in serialize
self.dill.dump(obj, dill_file, recurse=True)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 252, in dump
Pickler(file, protocol, **_kwds).dump(obj)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 420, in dump
StockPickler.dump(self, obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 487, in dump
self.save(obj)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
self.save_reduce(obj=obj, *rv)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
save(state)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
self._batch_setitems(obj.items())
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
save(v)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 931, in save_list
self._batch_appends(obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 955, in _batch_appends
save(x)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
self.save_reduce(obj=obj, *rv)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
save(state)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
self._batch_setitems(obj.items())
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
save(v)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
self.save_reduce(obj=obj, *rv)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
save(state)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
self._batch_setitems(obj.items())
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
save(v)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
self.save_reduce(obj=obj, *rv)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
save(state)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
self._batch_setitems(obj.items())
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
save(v)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
self.save_reduce(obj=obj, *rv)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 710, in save_reduce
self._batch_appends(listitems)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 955, in _batch_appends
save(x)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
self.save_reduce(obj=obj, *rv)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
save(state)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
f(self, obj) # Call unbound method with explicit self
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
self._batch_setitems(obj.items())
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
save(v)
File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
StockPickler.save(self, obj, save_persistent_id)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 578, in save
rv = reduce(self.proto)
File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/socket.py", line 272, in __getstate__
raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'socket' object
Library name
python
Latest version
3.12.2
Layout
CONDA
Size
30.8MB
Library name
transforms
Latest version
1.1040.0
Layout
CONDA
Library name
transforms-expectations
Latest version
0.363.0
Layout
CONDA
Size
23.9KB
Library name
transforms-verbs
Latest version
0.363.0
Layout
CONDA
Size
28.2KB
Library name
palantir_models
Latest version
0.1040.0
Layout
CONDA
Size
37.1KB
Library name
palantir_models_serializers
Latest version
0.1040.0
Layout
CONDA
Size
4.1KB
Library name
dill
Latest version
0.3.8
Layout
CONDA
Size
86.1KB
Library name
setuptools
Latest version
69.1.1
Layout
CONDA
As of March 5, 2024 this is currently not possible with models in Foundry. SparkML models need to serialize the models to a shared hadoop path, which model adapters do not have access to.
You can however, serialize model files to a dataset like the below example.
from transforms.api import transform, Input, Output
@transform(
training_data_input=Input("INPUT_PATH"),
model_output=Output("/path/to/spark/model/dataset"),
)
def compute(training_data_input, model_output):
training_df = training_data_input.dataframe()
model = train_model(training_df)
path = model_output.filesystem().hadoop_path + '/spark_model'
model.write().overwrite().save(path)
Which can then be later used for inference like:
from transforms.api import transform, Input, Output
from pyspark.ml import PipelineModel
@transform(
inference_input_dataset=Input("INPUT_PATH"),
model_input=Input("/path/to/spark/model/dataset"),
inference_output=Output("OUTPUT")
)
def compute(inference_input_dataset, model_input, inference_output):
inference_input_df = inference_input_dataset.dataframe()
path = model_input.filesystem().hadoop_path + '/spark_model'
model = PipelineModel.load(path) # This needs to match your model class
inferences_df = model.transform(inference_input_df)
inference_output.write_dataframe(inferences_df)
Palantir is currently working on how to create a default model serializer for this so that Spark models can be used with models, modeling objectives and deployments directly. I will update this stack overflow post when that is released.