google-cloud-vertex-aikubeflow-pipelinesvertex-ai-pipeline

How to modify a CustomJob for a specific component in vertexAI


I'm having trouble changing one component in my pipeline. In further detail, the component, which is a reusable component is loaded through a .yaml file, should operate on a different network (VPC) than the network that is by default assigned by VertexAI. I don't know how to do this.

This is more or less my approach at this moment:

# loading component
component_item = kfp.components.load_component(filename)

This returns a factory function with a strongly-typed signature. Once called with the required arguments, the factory constructs a pipeline task instance (ContainerOp) (source). Since my component's .yaml defines the input and outputs, I can now call the factory function to make it a real ContainerOP such that it can be used further on in my VertexAI Pipeline

component_item_op = component_item(
        some_argument1=1,
        some_argument2=2,
        some_argument3=3,
        test=True,
    )

Take note that component_item_op carries the class <class 'kfp.dsl._container_op.ContainerOp'>

Great so far, but having a quick look at the documentation from kubeflow and the google-cloud-aiplatform skd I don't find a way to modify the "job" such that I can assign it a different network. I know that I can modify the network if I can have access to the customjob object.

I tried different approaches, but non of them worked so far, in one example I tried using create_custom_training_job_op_from_component as part of google cloud pipelines components package. However, passing my component_item_op doesn't work, and fail with the error:

test = create_custom_training_job_op_from_component(
    component_item(
        some_argument1=1,
        some_argument2=2,
        some_argument3=3,
        test=True,
    )
)
training_job_op_from_component
    if component_spec.component_spec.inputs:
AttributeError: inputs

I would have expect that it would work out of the box, as according to the documentation one can pass an object with type ContainerOp and leaving all the rest as default.

So from the error it looks like it was not able to get the inputs. So yes, I'm a bit puzzled here. Maybe someone can give me some other pointers, or help me out.

.yaml

name: some name
description: some desc

inputs:
- {name: some_argument1, type: Integer, description: "some desc"}
- {name: some_argument2, type: Integer, description: "some desc"}
- {name: some_argument3, type: Integer, description: "some desc"}
- {name: test, type: Boolean, description: 'runs a test when true'}

outputs:
- {name: some_out, type: Dataset, description: 'some desc'}

implementation:
  container:
      args:
      - --executor_input
      - executorInput: null
      - --function_to_execute
      - some_func
      command:
      - python3
      - -m
      - kfp.v2.components.executor_main
      - --component_module_path
      - some_file.py
      image: some_image_path

Installed versions

kfp                              1.8.19
kfp-pipeline-spec                0.1.16
kfp-server-api                   1.8.5

Solution

  • Well, it turns out I was doing the right thing, albeit in the wrong order. Here's how I fixed it:

    tmp_op = create_custom_training_job_op_from_component(component_item, network="your_vpc_network_path)
    component_op = tmp_op(project="your_project", some_argument1=1, some_argument2=2, some_argument3=3)
    

    At this point, I have a ContainerOP object with the proper worker_pool_specs.