I have a series of airflow DAGs which re-use some of the task dependencies. For example
DAG 1:
T1 >> T2
DAG 2:
T1 >> T2 >> T3
DAG 3:
T1 >> T2 >> T3 >> [T4, T5, T6] >> T7
I would like to store the dependencies from DAG 1 (which in this model, are being used by every other DAG) and re-use them when declaring the dependencies for the other DAGs, like so:
def dag_1_dependencies():
T1 >> T2
DAG 2:
dag_1_dependencies() >> T3
DAG 3:
dag_1_dependencies() >> T3 >> [T4, T5, T6] >> T7
The problem is that dependencies themselves aren't a value, so I can't return them with a method. Calling dag_1_dependencies()
does nothing. Is there a way to circumvent this?
If tasks t1
and t2
are always the same tasks, you can generate these tasks + dependencies outside the DAG. To add additional tasks/dependencies following t2
, you need a reference to that object to configure the additional dependencies. For example:
def generate_t1_t2() -> BaseOperator:
"""Generates tasks + dependencies and returns the last task so that additional dependencies can be set."""
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
t1 >> t2
return t2
with DAG(dag_id="dag2", start_date=datetime(2025, 1, 1), schedule=None):
last_task = generate_t1_t2()
t3 = EmptyOperator(task_id="t3")
last_task >> t3
The function generate_t1_t2
returns the last operator in the chain (t2
), which allows configuring additional dependencies such as last_task >> t3
.
Your full question can therefore be written as:
from datetime import datetime
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.empty import EmptyOperator
def generate_t1_t2() -> BaseOperator:
"""Generates tasks + dependencies and returns the last task so that additional dependencies can be set."""
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
t1 >> t2
return t2
with DAG(dag_id="dag1", start_date=datetime(2025, 1, 1), schedule=None):
generate_t1_t2()
with DAG(dag_id="dag2", start_date=datetime(2025, 1, 1), schedule=None):
last_task = generate_t1_t2()
last_task >> EmptyOperator(task_id="t3")
with DAG(dag_id="dag3", start_date=datetime(2025, 1, 1), schedule=None):
last_task = generate_t1_t2()
(
last_task
>> EmptyOperator(task_id="t3")
>> [EmptyOperator(task_id="t4"), EmptyOperator(task_id="t5"), EmptyOperator(task_id="t6")]
>> EmptyOperator(task_id="t7")
)