pythongoogle-cloud-platformgoogle-cloud-storagegoogle-cloud-vertex-aikubeflow-pipelines

Reading Data in Vertex AI Pipelines


This is my first time using Google's Vertex AI Pipelines. I checked this codelab as well as this post and this post, on top of some links derived from the official documentation. I decided to put all that knowledge to work, in some toy example: I was planning to build a pipeline consisting of 2 components: "get-data" (which reads some .csv file stored in Cloud Storage) and "report-data" (which basically returns the shape of the .csv data read in the previous component). Furthermore, I was cautious to include some suggestions provided in this forum. The code I currently have, goes as follows:


from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform

# Components section   

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    import pandas as pd
    from google.cloud import storage
    
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # path = "gs://my-bucket/program_grouping_data.zip"
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
):
    import pandas as pd
    df = pd.read_csv(inputd.path)
    return df.shape


# Pipeline section

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="my-pipeline",
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )

# Compilation section

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)

# Running and submitting job

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
    enable_caching=True,
)

run1.submit()

I was happy to see that the pipeline compiled with no errors, and managed to submit the job. However "my happiness lasted short", as when I went to Vertex AI Pipelines, I stumbled upon some "error", which goes like:

The DAG failed because some tasks failed. The failed tasks are: [get-data].; Job (project_id = my-project, job_id = 4290278978419163136) is failed due to the above error.; Failed to handle the job: {project_number = xxxxxxxx, job_id = 4290278978419163136}

I did not find any related info on the web, neither could I find any log or something similar, and I feel a bit overwhelmed that the solution to this (seemingly) easy example, is still eluding me.

Quite obviously, I don't what or where I am mistaking. Any suggestion?


Solution

  • With some suggestions provided in the comments, I think I managed to make my demo pipeline work. I will first include the updated code:

    from kfp.v2 import compiler
    from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
    from datetime import datetime
    from google.cloud import aiplatform
    from typing import NamedTuple
    
    
    # Importing 'COMPONENTS' of the 'PIPELINE'
    
    @component(
        packages_to_install=[
            "google-cloud-storage",
            "pandas",
        ],
        base_image="python:3.9",
        output_component_file="get_data.yaml"
    )
    def get_data(
        bucket: str,
        url: str,
        dataset: Output[Dataset],
    ):
        """Reads a csv file, from some location in Cloud Storage"""
        import ast
        import pandas as pd
        from google.cloud import storage
        
        # 'Pulling' demo .csv data from a know location in GCS
        storage_client = storage.Client("my-project")
        bucket = storage_client.get_bucket(bucket)
        blob = bucket.blob(url)
        blob.download_to_filename('localdf.csv')
        
        # Reading the pulled demo .csv data
        df = pd.read_csv('localdf.csv', compression='zip')
        df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
        df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
    
    
    @component(
        packages_to_install=["pandas"],
        base_image="python:3.9",
        output_component_file="report_data.yaml"
    )
    def report_data(
        inputd: Input[Dataset],
    ) -> NamedTuple("output", [("rows", int), ("columns", int)]):
        """From a passed csv file existing in Cloud Storage, returns its dimensions"""
        import pandas as pd
        
        df = pd.read_csv(inputd.path+".csv")
        
        return df.shape
    
    
    # Building the 'PIPELINE'
    
    @pipeline(
        # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
        # Can be overriden when submitting the pipeline
        pipeline_root=PIPELINE_ROOT,
        name="readcsv-pipeline",  # Your own naming for the pipeline.
    )
    def my_pipeline(
        url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
        bucket: str = "my-bucket"
    ):
        dataset_task = get_data(bucket, url)
    
        dimensions = report_data(
            dataset_task.output
        )
        
    
    # Compiling the 'PIPELINE'    
    
    compiler.Compiler().compile(
        pipeline_func=my_pipeline, package_path="pipeline_job.json"
    )
    
    
    # Running the 'PIPELINE'
    
    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    
    run1 = aiplatform.PipelineJob(
        display_name="my-pipeline",
        template_path="pipeline_job.json",
        job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
        parameter_values={
            "url": "test_vertex/pipeline_root/program_grouping_data.zip",
            "bucket": "my-bucket"
        },
        enable_caching=True,
    )
    
    # Submitting the 'PIPELINE'
    
    run1.submit()
    

    Now, I will add some complementary comments, which in sum, managed to solve my problem:

    enter image description here

    Verte AI Pipelines Logs

    Of course, this is a very tiny example, and projects can easily scale to huge projects, however as some sort of "Hello Vertex AI Pipelines" it will work well.

    Thank you.