I am currently experimenting with reusable airflow tasks.
I am basing my testing on the dags provided in the documentation of airflow
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=DateTime(2022, 1, 1),schedule=None, catchup=False)
def mydag():
start = add_task.override(task_id="start_0")(1, 2)
for x, y, task_id_str in zip([1,3,5],[2,4,6],["start_1", "start_2", "start_3"], strict=True):
start >> add_task.override(task_id=task_id_str)(x,y)
mydag()
Unfortunately with the current setup the tasks 1 to 3 are parallel.
Is there a way to chain those tasks as a downstream in a pythonic way?
e.g t1 >> t2 >> t3
.. I can't seem to find any examples in the documentation.
Note, I do understand that I could use dynamic mapping, however, I want to see if I can do it without DM.
Resources: https://docs.astronomer.io/learn/managing-dependencies
I am expecting an end result that would look similar to
t0.set_downstream(t1)
t1.set_downstream(t2)
t2.set_downstream(t3)
You can use the chain()
method described in that Astronomer Learn doc you referenced. The trick is to create a list of the "start_*" tasks and then chain()
the unpacked list.
from datetime import datetime
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1),schedule=None, catchup=False)
def mydag():
start = add_task.override(task_id="start_0")(1, 2)
zip_tasks = []
for x, y, task_id_str in zip([1,3,5],[2,4,6],["start_1", "start_2", "start_3"], strict=True):
zip_tasks.append(add_task.override(task_id=task_id_str)(x,y))
chain(start, *zip_tasks)
mydag()