I'm trying to local test a kubeflow component from kfp.v2.ds1 (which works on a pipeline) using pytest, but struggling with the input/output arguments together with fixtures.
Here is a code example to illustrate the issue:
First, I created a fixture to mock a dataset. This fixture is also a kubeflow component.
# ./fixtures/
@pytest.fixture
@component()
def sample_df(dataset: Output[Dataset]):
df = pd.DataFrame(
{
'name': ['Ana', 'Maria', 'Josh'],
'age': [15, 19, 22],
}
)
dataset.path += '.csv'
df.to_csv(dataset.path, index=False)
return
Lets suppose the component double the ages.
# ./src/
@component()
def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
df = pd.read_csv(df_input.path)
double_df = df.copy()
double_df['age'] = double_df['age']*2
df_output.path += '.csv'
double_df.to_csv(df_output.path, index=False)
Then, the test:
#./tests/
@pytest.mark.usefixtures("sample_df")
def test_double_ages(sample_df):
expected_df = pd.DataFrame(
{
'name': ['Ana', 'Maria', 'Josh'],
'age': [30, 38, 44],
}
)
df_component = double_ages(sample_df) # This is where I call the component, sample_df is an Input[Dataset]
df_output = df_component.outputs['df_output']
df = pd.read_csv(df_output.path)
assert df['age'].tolist() == expected_df['age'].tolist()
But that's when the problem occurs. The Output[Dataset] that should be passed as an output, is not, so the component cannot properly work with it, then I would get the following error on assert df['age'].tolist() == expected_df['age'].tolist()
:
AttributeError: 'TaskOutputArgument' object has no attribute 'path'
Aparently, the object is of the type TaskOutputArgument
, instead of Dataset
.
Does anyone knows how to fix this? Or how to properly use pytest with kfp components? I've searched a lot on internet but couldn't find a clue about it.
After spending my afternoon on this, I finally figured out a way to pytest a python-based KFP component. As I found no other lead on this subject, I hope this can help:
The trick is not to directly test the KFP component created by the @component
decorator. However you can access the inner decorated Python function through the component attribute python_func
.
Regarding the Input
and Output
artifacts, as you get around KFP to access and call the tested function, you have to create them manually and pass them to the function:
input_artifact = Dataset(uri='input_df_previously_saved.csv')
output_artifact = Dataset(uri='target_output_path.csv')
I had to come up with a workaround for how the Artifact.path
property works (which also applies for all KFP Artifact
subclasses: Dataset
, Model
, ...). If you look in KFP source code, you'll find that it uses the _get_path()
method that returns None
if the uri
attribute does not start with one of the defined cloud prefixes: "gs://"
, "s3://"
or "minio://"
. As we're manually building artifacts with local paths, the tested component that wants to read the path
property of an artifact would read a None
value.
So I made a simple method that builds a subclass of an Artifact
(or a Dataset
or any other Artifact
child class). The built subclass is simply altered to return the uri
value instead of None
in this specific case of a non-cloud uri
.
Putting this all together for your test and your fixture, we can get the following code to work:
src/double_ages_component.py
: your component to testNothing changes here. I just added the pandas
import:
from kfp.v2.dsl import component, Input, Dataset, Output
@component
def double_ages(df_input: Input[Dataset], df_output: Output[Dataset]):
import pandas as pd
df = pd.read_csv(df_input.path)
double_df = df.copy()
double_df['age'] = double_df['age'] * 2
df_output.path += '.csv'
double_df.to_csv(df_output.path, index=False)
tests/utils.py
: the Artifact subclass builderimport typing
def make_test_artifact(artifact_type: typing.Type):
class TestArtifact(artifact_type):
def _get_path(self):
return super()._get_path() or self.uri
return TestArtifact
I am still not sure it is the most proper workaround. You could also manually create a subclass for each Artifact that you use (Dataset
in your example). Or you could directly mock the kfp.v2.dsl.Artifact
class using pytest-mock.
tests/conftest.py
: your fixtureI separated the sample dataframe creator component from the fixture. Hence we have a standard KFP component definition + a fixture that builds its output artifact and calls its python function:
from kfp.v2.dsl import component, Dataset, Output
import pytest
from tests.utils import make_test_artifact
@component
def sample_df_component(dataset: Output[Dataset]):
import pandas as pd
df = pd.DataFrame({
'name': ['Ana', 'Maria', 'Josh'],
'age': [15, 19, 22],
})
dataset.path += '.csv'
df.to_csv(dataset.path, index=False)
@pytest.fixture
def sample_df():
# define output artifact
output_path = 'local_sample_df.csv' # any writable local path. I'd recommend to use pytest `tmp_path` fixture.
sample_df_artifact = make_test_artifact(Dataset)(uri=output_path)
# call component python_func by passing the artifact yourself
sample_df_component.python_func(dataset=sample_df_artifact)
# the artifact object is now altered with the new path that you define in sample_df_component (".csv" extension added)
return sample_df_artifact
The fixture returns an artifact object referencing a selected local path where the sample dataframe has been saved to.
tests/test_component.py
: your actual component testOnce again, the idea is to build the I/O artifact(s) and to call the component's python_func
:
from kfp.v2.dsl import Dataset
import pandas as pd
from src.double_ages_component import double_ages
from tests.utils import make_test_artifact
def test_double_ages(sample_df):
expected_df = pd.DataFrame({
'name': ['Ana', 'Maria', 'Josh'],
'age': [30, 38, 44],
})
# input artifact is passed in parameter via sample_df fixture
# create output artifact
output_path = 'local_test_output_df.csv'
output_df_artifact = make_test_artifact(Dataset)(uri=output_path)
# call component python_func
double_ages.python_func(df_input=sample_df, df_output=output_df_artifact)
# read output data
df = pd.read_csv(output_df_artifact.path)
# write your tests
assert df['age'].tolist() == expected_df['age'].tolist()
> pytest
================ test session starts ================
platform linux -- Python 3.8.13, pytest-7.1.3, pluggy-1.0.0
rootdir: /home/USER/code/kfp_tests
collected 1 item
tests/test_component.py . [100%]
================ 1 passed in 0.28s ================