palantir-foundry

How to publish a Spark ML pyspark.ml.PipelineModel object in code repositories?


I would like to publish a pyspark.ml.PipelineModel object in a Model Integration code repository. I tried to follow the Supervised Learning Tutorial in the documentation, but that tutorial is for an sklearn model and works with pandas dataframes. I would like to update the approach to work with spark dataframes as input and produce PipelineModel objects to be used elsewhere in Foundry as the output.

The pyspark model logic I've used works -- I've run it successfully in Code Workbooks -- so I suspect the issue I'm having is with the adapter.py Model Adapter file.

Data

I created a toy dataset in a Code Workbook transform, built it, and referenced it within the model repository.

def toy_data():
    
    import pandas as pd

    pandas_df = pd.DataFrame({
        "x1":["a", "a", "a", "a", "a", "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "b", "b", "b", "b", "b"],
        "x2":["z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v", "z", "v"],
        "y":[1, 3, 2, 3, 2, 1, 2, 3, 4, 3, 11, 17, 13, 12, 17, 22, 21, 7, 14, 10]
    })

    spark_df = spark.createDataFrame(pandas_df)

    return spark_df

Code logic

Here is the logic in the model_training.py file.

from transforms.api import transform, Input
from palantir_models.transforms import ModelOutput
from main.model_adapters.adapter import ExampleModelAdapter


@transform(
    training_data_input=Input("INPUT_PATH"),
    model_output=ModelOutput("MODEL_OUTPUT_PATH"),
)
def compute(training_data_input, model_output):
    '''
        This function has only Foundry specific functionality.
        It extracts the training data, calls train_model to train a model, and saves the trained model to Foundry.
    '''
    training_df = training_data_input.dataframe()     # Load a Spark dataframe from the TransformsInput

    model = train_model(training_df)               # Train the model

    # Wrap the trained model in a ModelAdapter
    foundry_model = ExampleModelAdapter(model)     # Edit ExampleModelAdapter for your model

    # Publish and write the trained model to Foundry
    model_output.publish(
        model_adapter=foundry_model
    )


def train_model(training_df):

    from pyspark.ml.feature import StringIndexer, VectorAssembler
    from pyspark.ml.pipeline import Pipeline
    from pyspark.ml.regression import DecisionTreeRegressor
    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

    train_set = training_df
    cat_vars = ['x1', 'x2']

    # Pipeline components
    indexer = StringIndexer(inputCols=cat_vars, 
                            outputCols=[var + "_indexed" for var in cat_vars],
                            handleInvalid='keep')
    assembler = VectorAssembler(inputCols=[var + "_indexed" for var in cat_vars], outputCol='features')

    decision_tree = DecisionTreeRegressor(featuresCol='features', labelCol='y', seed=123)

    # regression evaluator for model performance
    regression_evaluator = RegressionEvaluator(labelCol='y')

    # Set pipeline
    pipeline=Pipeline(stages=[
        indexer,
        assembler,
        decision_tree])

    # Define hyperparameter grid
    params = (
        ParamGridBuilder()
        .addGrid(param=decision_tree.maxDepth, values=[1, 2])
        .addGrid(param=decision_tree.minInstancesPerNode, values=[1, 2, 3])
        .build()
    )

    # Define cross validator and extract best model
    cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=regression_evaluator, numFolds=2, seed=477)
    cv_trees = cv.fit(train_set)
    best_model = cv_trees.bestModel

    return best_model

Model Adapter

Here's the logic for adapter.py. Note that I do intend the model input to be a Spark Dataframe, and the model output to be a Spark Dataframe that has been transformed by the PipelineModel object.

import palantir_models as pm
from palantir_models_serializers import DillSerializer


class ExampleModelAdapter(pm.ModelAdapter):

    @pm.auto_serialize(
        # TODO: Fill in the model constructor and define parameter serialization.
        model=DillSerializer()
    )
    def __init__(self, model):
        self.model = model


    @classmethod
    def api(cls):
        # TODO: Edit this method to define the model API.
        inputs = {
            "df_in": pm.Spark()
        }
        outputs = {
            "df_out": pm.Spark()
        }
        return inputs, outputs

    def predict(self, df_in):
        # Input signature should match inputs defined in api()
        # Return type should match the output type defined in api()

        # TODO: Apply custom inference logic
        df_in = self.model.transform(df_in)
        return df_in

Error output

Here is the resulting error when I try to build the model.

[module version: 1.1008.0]

TypeError: cannot pickle 'socket' object

Traceback (most recent call last):
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/site-packages/transforms/_build.py", line 393, in run
    self._transform.compute(ctx=transform_context, **parameters)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/site-packages/transforms/api/_transform.py", line 304, in compute
    self(**kwargs)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/site-packages/transforms/api/_transform.py", line 217, in __call__
    return self._compute_func(*args, **kwargs)
  File "/app/work-dir/__user_code_environment__/__SYMLINKS__/site-packages/main/model_training/model_training.py", line 32, in compute
    model_output.publish(
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/palantir_models/models/_writable_model.py", line 111, in publish
    model_adapter.save(state_writer)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/palantir_models/models/_model_adapter.py", line 136, in save
    serializers[arg_name].serialize(SubdirectoryModelStateWriter(arg_name, state_writer), arg_value)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/palantir_models_serializers/serializers/dill.py", line 19, in serialize
    self.dill.dump(obj, dill_file, recurse=True)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 252, in dump
    Pickler(file, protocol, **_kwds).dump(obj)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 420, in dump
    StockPickler.dump(self, obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 487, in dump
    self.save(obj)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
    save(state)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
    save(v)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 931, in save_list
    self._batch_appends(obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 955, in _batch_appends
    save(x)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
    save(state)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
    save(v)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
    save(state)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
    save(v)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
    save(state)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
    save(v)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 710, in save_reduce
    self._batch_appends(listitems)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 955, in _batch_appends
    save(x)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 603, in save
    self.save_reduce(obj=obj, *rv)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 717, in save_reduce
    save(state)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 560, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 1217, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 971, in save_dict
    self._batch_setitems(obj.items())
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 997, in _batch_setitems
    save(v)
  File "/app/work-dir/__environment__/__SYMLINKS__/site-packages/dill/_dill.py", line 414, in save
    StockPickler.save(self, obj, save_persistent_id)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/pickle.py", line 578, in save
    rv = reduce(self.proto)
  File "/app/work-dir/__python_runtime_environment__/__SYMLINKS__/python/socket.py", line 272, in __getstate__
    raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'socket' object

Module versions

Library name
python
Latest version
3.12.2
Layout
CONDA
Size
30.8MB

Library name
transforms
Latest version
1.1040.0
Layout
CONDA

Library name
transforms-expectations
Latest version
0.363.0
Layout
CONDA
Size
23.9KB

Library name
transforms-verbs
Latest version
0.363.0
Layout
CONDA
Size
28.2KB

Library name
palantir_models
Latest version
0.1040.0
Layout
CONDA
Size
37.1KB

Library name
palantir_models_serializers
Latest version
0.1040.0
Layout
CONDA
Size
4.1KB

Library name
dill
Latest version
0.3.8
Layout
CONDA
Size
86.1KB

Library name
setuptools
Latest version
69.1.1
Layout
CONDA

Solution

  • As of March 5, 2024 this is currently not possible with models in Foundry. SparkML models need to serialize the models to a shared hadoop path, which model adapters do not have access to.

    You can however, serialize model files to a dataset like the below example.

    from transforms.api import transform, Input, Output
    
    
    @transform(
        training_data_input=Input("INPUT_PATH"),
        model_output=Output("/path/to/spark/model/dataset"),
    )
    def compute(training_data_input, model_output):
        training_df = training_data_input.dataframe()
        model = train_model(training_df)
        path = model_output.filesystem().hadoop_path + '/spark_model'
        model.write().overwrite().save(path)
    

    Which can then be later used for inference like:

    from transforms.api import transform, Input, Output
    from pyspark.ml import PipelineModel
    
    
    @transform(
        inference_input_dataset=Input("INPUT_PATH"),
        model_input=Input("/path/to/spark/model/dataset"),
        inference_output=Output("OUTPUT")
    )
    def compute(inference_input_dataset, model_input, inference_output):
        inference_input_df = inference_input_dataset.dataframe()
    
        path = model_input.filesystem().hadoop_path + '/spark_model'
        model = PipelineModel.load(path)  # This needs to match your model class
    
        inferences_df = model.transform(inference_input_df)
        inference_output.write_dataframe(inferences_df)
    

    Palantir is currently working on how to create a default model serializer for this so that Spark models can be used with models, modeling objectives and deployments directly. I will update this stack overflow post when that is released.