Let's say that I have the following (simplified) dag: I have a task that returns a series of query parameters values, and I want to spawn a dynamic task instance of httpoperator to do a query like http://example.com?key=value1
, http://example.com?key=value2
, etc.
But I'm not able to map the data
field in the operator
@task
def get_values():
return ["value1","value2","value3"]
@dag
def mydag() -> None:
values = get_values()
gets = HttpOperator.partial(task_id='gets', method='GET').expand(
data={ 'key': values }, # expanding in the ui, but "ValueError: too many values to unpack (expected 2)" at runtime
data=[{'key': value} for value in values], # dag error: "TypeError: 'XComArg' object is not iterable"
)
I'm not sure how to set up the parameter to actually use the values object properly.
worse, in the real dag data normally would refer also to additional qp and some of them comes from other tasks (with a static mapping)
FWIW I got a way to expand the query parameter to multiple values.
To do it I relies on Operator.expand_kwargs
and XComArg.map
:
@task
def get_values():
return ["value1","value2","value3"]
@dag
def mydag() -> None:
values = get_values()
gets = HttpOperator.partial(task_id='gets', method='GET').expand_kwargs(
values.map(lambda value: { 'data': {'key': value} }
)