I have a Vertex A pipeline that I use for example:
run_1_output = run_1().set_memory_limit("1G").set_cpu_limit("1")
with kfp.dsl.ParallelFor(run_1_output.output) as key:
run_1a_output = run_1a(key=key).set_memory_limit("1G").set_cpu_limit("1")
run_1b_output = run_1b(run_1_output=run_1_output.output).set_memory_limit("1G").set_cpu_limit("1")
With run_1 component being as simple as:
@component()
def run_1() -> List[str]:
resp = ["1", "2"]
return resp
As you can see, the output for the first component run_1() is a hardcoded list of string.
The ParallelFor should iterate on this list to pass each elements of the list as argument "key" for the second function.
I have no issues with pipeline deployment on Vertex AI, first component runs well but I am observing inconsistent behavior that prevents running the parallelfor and cannot identify the cause.
The error message given by the for loop in Vertex Pipeline interface is: The task_spec.parameter_iterator input parameter expected a string or list value.
This error is triggered before the component even running and seems to be a Kubeflow exception for argument's wrong type.
Do you have any idea what may cause this issue? The weird thing is that I dont always observe this error, compiling & deploying the same pipeline without any code modification.
A difference I notice between a successful execution & unsuccessful is that is output from the first component run_1 is not refresh from its default value undefined_till_runtime even if it succeeded as shown in the images:
EDIT (SOLVED): the problem was with kfp version == 1.18.17. Everything worked perfectly fine after switching to version 1.18.16.
The problem was with kfp version == 1.18.17. Everything worked perfectly fine after switching to version 1.18.16.