I'm using local kubeflow pipelines for building a continuous machine learning test project. I have one pipeline that preprocess the data using TFX, and it saves the outputs automatically to minio. Outside of this pipeline, I want to train the model using tfx's Trainer, but I need the artifacts generated in the preprocessing pipeline. Is there an implemented way to import this outputs? I've looked through the documentation and some issues, but can't find an answer. And because I'm trying to do it continuous, I can't rely on doing it manually.
Example of my preprocessing pipeline:
@kfp.dsl.pipeline(
name='TFX',
description='TFX pipeline'
)
def tfx_pipeline():
# DL with wget, can use gcs instead as well
fetch = kfp.dsl.ContainerOp(
name='download',
image='busybox',
command=['sh', '-c'],
arguments=[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget <gcp link> -O /tmp/data/results.csv'],
file_outputs={'downloaded': '/tmp/data'})
records_example = tfx_csv_gen(input_base=fetch.output)
stats = tfx_statistic_gen(input_data=records_example.output)
schema_op = tfx_schema_gen(stats.output)
tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
#tag::tft[]
transformed_output = tfx_transform(
input_data=records_example.output,
schema=schema_op.outputs['output'],
module_file=module_file) # Path to your TFT code on GCS/S3
#end::tft[]
and then executing with
kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')
client = kfp.Client()
client.list_experiments()
#exp = client.create_experiment(name='mdupdate')
my_experiment = client.create_experiment(name='tfx_pipeline')
my_run = client.run_pipeline(my_experiment.id, 'tfx',
'tfx_pipeline.zip')
I'm working on a .ipynb in visual studio code
You can get that information like this: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001
component_name: This can be checked in the yaml definition of the pipeline, under templates.name
(search for the component containing the output you want)
artifact_name: This can also be checked in the yaml definition of the pipeline, under that same component on the outputs
attribute
Once you got these two parameters, you can use the functions as described in the above url:
#!/usr/bin/env python3
import json
import tarfile
from base64 import b64decode
from io import BytesIO
import kfp
def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
run = client.runs.get_run(run_id)
workflow = json.loads(run.pipeline_runtime.workflow_manifest)
nodes = workflow["status"]["nodes"]
for node_id, node_info in nodes.items():
if node_info["displayName"] == component_name:
return node_id
else:
raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")
def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
# Artifacts are returned as base64-encoded .tar.gz strings
data = b64decode(artifact.data)
io_buffer = BytesIO()
io_buffer.write(data)
io_buffer.seek(0)
data = None
with tarfile.open(fileobj=io_buffer) as tar:
member_names = tar.getnames()
if len(member_names) == 1:
data = tar.extractfile(member_names[0]).read().decode('utf-8')
else:
# Is it possible for KFP artifacts to have multiple members?
data = {}
for member_name in member_names:
data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
return data
if __name__ == "__main__":
run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
component_name = "my-component"
# For an output variable named "output_data"
artifact_name = "my-component-output_data"
client = kfp.Client()
node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
artifact = get_artifact(
run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
)
# Do something with artifact ...