airflow

What is the proper way to set external task sensor in Airflow with different scheduled interval?


I just joined a new company and I'm trying to learn Airflow as I used it. So far I've got the basics of most things down except External Task Sensors.

I have two DAGs, DAG A that has a schedule interval of "0 6 * * *" and DAG B with schedule interval of "0 7 * * *" DAG A waits for DAG B to Complete before it Continues. However DAG B sometimes takes 3 hours to Complete and at other times 10+ hours.

I created an ExternalTask Sensor as show Below but it never triggers and timesout even when DAG B is complete.

ExternalTaskSensor(
    task_id = "wait_sensor",
    external_dag_id="dag_b",
    external_task_id = "end",
    poke_interval = 60*30,
    timeout=60*60,
    retries = 10,
    execution_delta= timedelta(hours=2),
    dag=dag 
)

How can I properly set up the sensor?


Solution

  • The execution date of DAG A is one hour before DAG B, and you set the execution delta to 2 hours, meaning DAG A external sensor is trying to find DAG B with an execution date of 0 4 * * *, which doesn't exist. in this case, your external sensor task fails on timeout. you could set check_existence=True to fail immediately instead of waiting for 10 retries. Also, you can see in the external task sensor log on which external task execution date its poking:

    [2022-12-02, 08:21:36 UTC] {external_task.py:206} INFO - Poking for tasks ['test_task'] in dag test_dag on 2022-12-02T08:25:00+00:00 ...

    Solution:

    From the sensor docs, "For yesterday, use [positive!] datetime.timedelta(days=1)"

    So to resolve this, you need to provide a negative execution_delta of 1 hour, as the execution date of DAG A is exactly 1 hour after DAG B:

    execution_delta=timedelta(hours=-1)