Let's say I have a dag with multiple tasks
task1 >> task2 >> [task3, task4]
I want task1
to be executed at the beginning.
Then task2
.
If task2
ends with success, then execute task3
, else execute task4
.
I've tried to set the TriggerRule
all_failed
to task4
and, if task2
fails/ends with success everything works as desired.
However if task1
fails, task2
and task3
are marked as upstream_failed
, BUT task4
is executed.
According to https://airflow.apache.org/docs/apache-airflow/2.9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4
only if upstream is failed (and only failed)"
How can I achieve that?
The issue arises because all_failed considers both failed and upstream_failed states, which means Task4 runs even if Task1 fails. Airflow does not have a built-in "only direct upstream failed" rule, but you can use BranchPythonOperator
to dynamically branch based on Task2's outcome.
...
task1 = DummyOperator(task_id="task1", dag=dag)
task2 = PythonOperator(
task_id='task2',
python_callable=task2_logic,
provide_context=True,
dag=dag
)
branch = BranchPythonOperator(
task_id='branching',
python_callable=branch_task,
provide_context=True,
dag=dag
)
task3 = DummyOperator(task_id="task3", dag=dag)
task4 = DummyOperator(task_id="task4", dag=dag)
task1 >> task2 >> branch
branch >> [task3, task4]