pythonairflowdirected-acyclic-graphs

Run a separate task if the parent task fails in airflow DAG


I have a series of tasks in an airflow DAG as below:

task1:
//do something

task2:
//do something 

task3:
//do something

task4:
//do cleanup

task1 >> task2 >> task3

ASK: I want to run task4 if any of task1 or task2 fails.


Solution

  • You can use Trigger Rules. Here is an example:

    import pendulum
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.utils.task_group import TaskGroup
    from airflow.utils.trigger_rule import TriggerRule
    
    dag = DAG(
        dag_id='Ab_sin',
        start_date=pendulum.datetime(2024, 1, 1),
        schedule_interval=None,
    )
    
    
    with TaskGroup(group_id='ignore_task_4', dag=dag) as ignore_task_4:
        task1 = BashOperator(dag=dag, task_id='task1', bash_command='echo "task1"')
        task2 = BashOperator(dag=dag, task_id='task2', bash_command='echo "task2"')
        task3 = BashOperator(dag=dag, task_id='task3', bash_command='echo "task3"')
        task4 = BashOperator(dag=dag, task_id='task4', bash_command='echo "task4"', trigger_rule=TriggerRule.ONE_FAILED)
    
        task1 >> task2 >> task3
        task4.set_upstream([task1, task2])
    
    
    with TaskGroup(group_id='run_task_4', dag=dag) as run_task_4:
        task1 = BashOperator(dag=dag, task_id='task1', bash_command='lets say first task was failed')
        task2 = BashOperator(dag=dag, task_id='task2', bash_command='echo "task2"')
        task3 = BashOperator(dag=dag, task_id='task3', bash_command='echo "task3"')
        task4 = BashOperator(dag=dag, task_id='task4', bash_command='echo "task4"', trigger_rule=TriggerRule.ONE_FAILED)
    
        task1 >> task2 >> task3
        task4.set_upstream([task1, task2])
    

    demo DAG