pythonairflowdirected-acyclic-graphs

Can Airflow task dependencies be re-used?


I have a series of airflow DAGs which re-use some of the task dependencies. For example

DAG 1:
T1 >> T2

DAG 2:
T1 >> T2 >> T3

DAG 3:
T1 >> T2 >> T3 >> [T4, T5, T6] >> T7

I would like to store the dependencies from DAG 1 (which in this model, are being used by every other DAG) and re-use them when declaring the dependencies for the other DAGs, like so:

def dag_1_dependencies():
    T1 >> T2

DAG 2:
dag_1_dependencies() >> T3

DAG 3:
dag_1_dependencies() >> T3 >> [T4, T5, T6] >> T7

The problem is that dependencies themselves aren't a value, so I can't return them with a method. Calling dag_1_dependencies() does nothing. Is there a way to circumvent this?


Solution

  • If tasks t1 and t2 are always the same tasks, you can generate these tasks + dependencies outside the DAG. To add additional tasks/dependencies following t2, you need a reference to that object to configure the additional dependencies. For example:

    def generate_t1_t2() -> BaseOperator:
        """Generates tasks + dependencies and returns the last task so that additional dependencies can be set."""
        t1 = EmptyOperator(task_id="t1")
        t2 = EmptyOperator(task_id="t2")
        t1 >> t2
        return t2
    
    with DAG(dag_id="dag2", start_date=datetime(2025, 1, 1), schedule=None):
        last_task = generate_t1_t2()
        t3 = EmptyOperator(task_id="t3")
        last_task >> t3
    

    The function generate_t1_t2 returns the last operator in the chain (t2), which allows configuring additional dependencies such as last_task >> t3.

    Your full question can therefore be written as:

    from datetime import datetime
    
    from airflow import DAG
    from airflow.models import BaseOperator
    from airflow.operators.empty import EmptyOperator
    
    
    def generate_t1_t2() -> BaseOperator:
        """Generates tasks + dependencies and returns the last task so that additional dependencies can be set."""
        t1 = EmptyOperator(task_id="t1")
        t2 = EmptyOperator(task_id="t2")
        t1 >> t2
        return t2
    
    
    with DAG(dag_id="dag1", start_date=datetime(2025, 1, 1), schedule=None):
        generate_t1_t2()
    
    
    with DAG(dag_id="dag2", start_date=datetime(2025, 1, 1), schedule=None):
        last_task = generate_t1_t2()
        last_task >> EmptyOperator(task_id="t3")
    
    with DAG(dag_id="dag3", start_date=datetime(2025, 1, 1), schedule=None):
        last_task = generate_t1_t2()
        (
            last_task
            >> EmptyOperator(task_id="t3")
            >> [EmptyOperator(task_id="t4"), EmptyOperator(task_id="t5"), EmptyOperator(task_id="t6")]
            >> EmptyOperator(task_id="t7")
        )