pythonpytestkubeflowkubeflow-pipelineskfp

How to test kfp components with pytest


I'm trying to local test a kubeflow component from kfp.v2.ds1 (which works on a pipeline) using pytest, but struggling with the input/output arguments together with fixtures.

Here is a code example to illustrate the issue:

First, I created a fixture to mock a dataset. This fixture is also a kubeflow component.

# ./fixtures/

    @pytest.fixture
    @component()
    def sample_df(dataset: Output[Dataset]):
         df = pd.DataFrame(
             {
                 'name': ['Ana', 'Maria', 'Josh'],
                 'age': [15, 19, 22],
             }
         )
         dataset.path += '.csv'
         df.to_csv(dataset.path, index=False)
         return

Lets suppose the component double the ages.

# ./src/
    @component()
    def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
         df = pd.read_csv(df_input.path)
         
         double_df = df.copy()
         double_df['age'] = double_df['age']*2

         df_output.path += '.csv'
         double_df.to_csv(df_output.path, index=False)

Then, the test:

#./tests/

@pytest.mark.usefixtures("sample_df")
def test_double_ages(sample_df):

    expected_df = pd.DataFrame(
        {
            'name': ['Ana', 'Maria', 'Josh'],
            'age': [30, 38, 44],
        }
    )

    df_component = double_ages(sample_df)    # This is where I call the component, sample_df is an Input[Dataset]
    df_output = df_component.outputs['df_output']
    df = pd.read_csv(df_output.path)
    
    assert df['age'].tolist() == expected_df['age'].tolist()

But that's when the problem occurs. The Output[Dataset] that should be passed as an output, is not, so the component cannot properly work with it, then I would get the following error on assert df['age'].tolist() == expected_df['age'].tolist():

AttributeError: 'TaskOutputArgument' object has no attribute 'path'

Aparently, the object is of the type TaskOutputArgument, instead of Dataset.

Does anyone knows how to fix this? Or how to properly use pytest with kfp components? I've searched a lot on internet but couldn't find a clue about it.


Solution

  • After spending my afternoon on this, I finally figured out a way to pytest a python-based KFP component. As I found no other lead on this subject, I hope this can help:

    Access the function to test

    The trick is not to directly test the KFP component created by the @component decorator. However you can access the inner decorated Python function through the component attribute python_func.

    Mock artifacts

    Regarding the Input and Output artifacts, as you get around KFP to access and call the tested function, you have to create them manually and pass them to the function:

    input_artifact = Dataset(uri='input_df_previously_saved.csv')
    output_artifact = Dataset(uri='target_output_path.csv')
    

    I had to come up with a workaround for how the Artifact.path property works (which also applies for all KFP Artifact subclasses: Dataset, Model, ...). If you look in KFP source code, you'll find that it uses the _get_path() method that returns None if the uri attribute does not start with one of the defined cloud prefixes: "gs://", "s3://" or "minio://". As we're manually building artifacts with local paths, the tested component that wants to read the path property of an artifact would read a None value.

    So I made a simple method that builds a subclass of an Artifact (or a Dataset or any other Artifact child class). The built subclass is simply altered to return the uri value instead of None in this specific case of a non-cloud uri.

    Your example

    Putting this all together for your test and your fixture, we can get the following code to work:

    Nothing changes here. I just added the pandas import:

    from kfp.v2.dsl import component, Input, Dataset, Output
    
    @component
    def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
        import pandas as pd
    
        df = pd.read_csv(df_input.path)
    
        double_df = df.copy()
        double_df['age'] = double_df['age'] * 2
    
        df_output.path += '.csv'
        double_df.to_csv(df_output.path, index=False)
    
    import typing
    
    def make_test_artifact(artifact_type: typing.Type):
        class TestArtifact(artifact_type):
            def _get_path(self):
                return super()._get_path() or self.uri
    
        return TestArtifact
    

    I am still not sure it is the most proper workaround. You could also manually create a subclass for each Artifact that you use (Dataset in your example). Or you could directly mock the kfp.v2.dsl.Artifact class using pytest-mock.

    I separated the sample dataframe creator component from the fixture. Hence we have a standard KFP component definition + a fixture that builds its output artifact and calls its python function:

    from kfp.v2.dsl import component, Dataset, Output
    import pytest
    
    from tests.utils import make_test_artifact
    
    @component
    def sample_df_component(dataset: Output[Dataset]):
        import pandas as pd
    
        df = pd.DataFrame({
            'name': ['Ana', 'Maria', 'Josh'],
            'age': [15, 19, 22],
        })
        dataset.path += '.csv'
        df.to_csv(dataset.path, index=False)
    
    @pytest.fixture
    def sample_df():
        # define output artifact
        output_path = 'local_sample_df.csv'  # any writable local path. I'd recommend to use pytest `tmp_path` fixture.
        sample_df_artifact = make_test_artifact(Dataset)(uri=output_path)
    
        # call component python_func by passing the artifact yourself
        sample_df_component.python_func(dataset=sample_df_artifact)
        # the artifact object is now altered with the new path that you define in sample_df_component (".csv" extension added)
    
        return sample_df_artifact
    

    The fixture returns an artifact object referencing a selected local path where the sample dataframe has been saved to.

    Once again, the idea is to build the I/O artifact(s) and to call the component's python_func:

    from kfp.v2.dsl import Dataset
    import pandas as pd
    
    from src.double_ages_component import double_ages
    from tests.utils import make_test_artifact
    
    def test_double_ages(sample_df):
        expected_df = pd.DataFrame({
            'name': ['Ana', 'Maria', 'Josh'],
            'age': [30, 38, 44],
        })
    
        # input artifact is passed in parameter via sample_df fixture
        # create output artifact
        output_path = 'local_test_output_df.csv'
        output_df_artifact = make_test_artifact(Dataset)(uri=output_path)
    
        # call component python_func
        double_ages.python_func(df_input=sample_df, df_output=output_df_artifact)
    
        # read output data
        df = pd.read_csv(output_df_artifact.path)
    
        # write your tests
        assert df['age'].tolist() == expected_df['age'].tolist()
    

    Result

    > pytest
    ================ test session starts ================
    platform linux -- Python 3.8.13, pytest-7.1.3, pluggy-1.0.0
    rootdir: /home/USER/code/kfp_tests
    collected 1 item                                                                                                                                                                                                                        
    
    tests/test_component.py .                      [100%]
    
    ================ 1 passed in 0.28s ================