airflow

Trigger a task iff direct upstream failed


Let's say I have a dag with multiple tasks

task1 >> task2 >> [task3, task4]

I want task1 to be executed at the beginning.

Then task2.

If task2 ends with success, then execute task3, else execute task4.

I've tried to set the TriggerRule all_failed to task4 and, if task2 fails/ends with success everything works as desired.

However if task1 fails, task2 and task3 are marked as upstream_failed, BUT task4 is executed.

According to https://airflow.apache.org/docs/apache-airflow/2.9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4 only if upstream is failed (and only failed)"

How can I achieve that?


Solution

  • The issue arises because all_failed considers both failed and upstream_failed states, which means Task4 runs even if Task1 fails. Airflow does not have a built-in "only direct upstream failed" rule, but you can use BranchPythonOperator to dynamically branch based on Task2's outcome.

    ...
    task1 = DummyOperator(task_id="task1", dag=dag)
    
    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_logic,
        provide_context=True,
        dag=dag
    )
    
    branch = BranchPythonOperator(
        task_id='branching',
        python_callable=branch_task,
        provide_context=True,
        dag=dag
    )
    
    task3 = DummyOperator(task_id="task3", dag=dag)
    task4 = DummyOperator(task_id="task4", dag=dag)
    
    task1 >> task2 >> branch
    branch >> [task3, task4]