kubeflowkfp

Kubeflow Multiple images components in pipeline


I want to write a pipeline in Kubeflow pipeline that has 2 components: A and B

The output of A is list of image path.

I want to run a docker image (B) for each image path

From what I see the dsl.ContainerOp of B can wait for output of A, but I don't know how to create multiple instances of B


Solution

  • Update: This has changed recently and can be done simply by using ParallerlFor over the output. Refer: https://stackoverflow.com/a/59292863/4438213

    ----- Below for KF 0.6 and before ----

    This is a recognized issue with Kubeflow DSL: to use the output of one component (A) and iterate over it running a new component (B) for each entry in the previous output. It's hard since the DSL, Kubeflow uses, is at compile time and it's not possible to know how many elements there would be in the output at that time.

    Ref:

    The only form of dynamic (run-time) iteration supported as of KF v0.6 is: dsl-recursion. I've made it work in 2 ways lacking the pending work on the issues above:

    If the size of the result of A is going to be a constant in each run and is pre-known, this is straight forward.

    CASE A: The size of the output from the previous step is known

    1. Create a lightweight comp to get the image path at a given index
    # Write a python code to extract the path from
    # the string of refs the previous step returns 
    def get_path(str_of_paths: str, idx: int) -> str:
        return str_of_paths.split(" ")[idx] # or some other delimiter
    
    1. Wrap the python code in a Kubeflow lightweight components
    get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image
    

    And then a regular for loop in your pipeline dsl code would work

    image_path_res = ContainerOP_A() # run your container Op
    for idx in range(4):
        path = get_path(image_path_res.output, i)
        ContainerOp_B(path.output)
    

    CASE B: When the output of the previous step is not of fixed size

    This is a little tricky and intricate. The only form of dynamic looping Kubeflow allows as of KF v0.6 is dsl-recursion

    Option 1

    1. Create 2 lightweight components, one for calculating the size of the result sizer_op and then reuse the same get_img_path_comp from above.
    @dsl.component
    def sizer_op(str_of_refs) -> int:
        return len(str_of_refs.split("|"))
    sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')
    

    Then you can run the recusive function

    @dsl.component
    def subtracter_op(cur_idx) -> int:
        return cur_idx - 1
    sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')
    
    @dsl.graph_component
    def recursive_run(list_of_images, cur_ref):
        with dsl.Condition(cur_ref >= 0):
            path = get_path(image_path_res.output, i)
            ContainerOp_B(path.output)
    
            # call recursively
            next_ref = sub_op_comp(cur_ref)
            recursive_run(list_of_images, next_ref)
    
    
    image_path_res = ContainerOP_A() # run your container Op
    sizer = sizer_op_comp(image_path_res)
    recursive_run(image_path_res.output, sizer.output)
    

    Option 2

    After running ContainerOp_A, create a Kubeflow Component that reads the results from ContainerOp_A, parses the results in python code itself and then spawns new runs that run just Containerop_B using kfclient. You can connect to KF pipeline client using:

    kf_client = Client(host=localhost:9990)
    

    Refer: kf_client