I just joined a new company and I'm trying to learn Airflow as I used it. So far I've got the basics of most things down except External Task Sensors.
I have two DAGs, DAG A that has a schedule interval of "0 6 * * *"
and DAG B with schedule interval of "0 7 * * *"
DAG A waits for DAG B to Complete before it Continues. However DAG B sometimes takes 3 hours to Complete and at other times 10+ hours.
I created an ExternalTask Sensor as show Below but it never triggers and timesout even when DAG B is complete.
ExternalTaskSensor(
task_id = "wait_sensor",
external_dag_id="dag_b",
external_task_id = "end",
poke_interval = 60*30,
timeout=60*60,
retries = 10,
execution_delta= timedelta(hours=2),
dag=dag
)
How can I properly set up the sensor?
The execution date of DAG A is one hour before DAG B, and you set the execution delta to 2 hours, meaning DAG A external sensor is trying to find DAG B with an execution date of 0 4 * * *, which doesn't exist. in this case, your external sensor task fails on timeout. you could set check_existence=True
to fail immediately instead of waiting for 10 retries.
Also, you can see in the external task sensor log on which external task execution date its poking:
[2022-12-02, 08:21:36 UTC] {external_task.py:206} INFO - Poking for tasks ['test_task'] in dag test_dag on 2022-12-02T08:25:00+00:00 ...
Solution:
From the sensor docs, "For yesterday, use [positive!] datetime.timedelta(days=1)"
So to resolve this, you need to provide a negative execution_delta of 1 hour, as the execution date of DAG A is exactly 1 hour after DAG B:
execution_delta=timedelta(hours=-1)