I have a series of tasks in an airflow DAG as below:
task1:
//do something
task2:
//do something
task3:
//do something
task4:
//do cleanup
task1 >> task2 >> task3
ASK: I want to run task4 if any of task1 or task2 fails.
You can use Trigger Rules. Here is an example:
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id='Ab_sin',
start_date=pendulum.datetime(2024, 1, 1),
schedule_interval=None,
)
with TaskGroup(group_id='ignore_task_4', dag=dag) as ignore_task_4:
task1 = BashOperator(dag=dag, task_id='task1', bash_command='echo "task1"')
task2 = BashOperator(dag=dag, task_id='task2', bash_command='echo "task2"')
task3 = BashOperator(dag=dag, task_id='task3', bash_command='echo "task3"')
task4 = BashOperator(dag=dag, task_id='task4', bash_command='echo "task4"', trigger_rule=TriggerRule.ONE_FAILED)
task1 >> task2 >> task3
task4.set_upstream([task1, task2])
with TaskGroup(group_id='run_task_4', dag=dag) as run_task_4:
task1 = BashOperator(dag=dag, task_id='task1', bash_command='lets say first task was failed')
task2 = BashOperator(dag=dag, task_id='task2', bash_command='echo "task2"')
task3 = BashOperator(dag=dag, task_id='task3', bash_command='echo "task3"')
task4 = BashOperator(dag=dag, task_id='task4', bash_command='echo "task4"', trigger_rule=TriggerRule.ONE_FAILED)
task1 >> task2 >> task3
task4.set_upstream([task1, task2])