callbackairflow

Can I set both DAG level and task level on_failure_callbacks in Airflow?


I would like to set both task-level and DAG-level on_failure_callbacks and have them perform two different actions. Would the DAG level failure callbacks and task level callbacks both trigger if configured without overriding one another? Or would the task level callback override the DAG level callback?

The documentation is a little confusing on this point.

I would then have a definition as follows:

with DAG(
        default_args={"has different on_failure_callback"}
        dag_id = "test_dag",
        schedule_interval=None,
        start_date=datetime.datetime(2021, 1, 1),
        catchup=False,
        on_failure_callback=failure_callback_dag
    ) as dag:

failure_callback_dag calls a different function to report a DAG run failure.


Solution

  • Yes, that's possible. Both on_failure_callback functions work independently of each other. For example:

    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    
    def handle_dag_failure(context):
        print("TODO handle DAG failure")
        print(f"context = {context}")
    
    
    def handle_task_failure(context):
        print("TODO handle task failure")
        print(f"context = {context}")
    
    
    with DAG(dag_id="so_79020182", schedule=None, on_failure_callback=handle_dag_failure):
        BashOperator(task_id="test", bash_command="exit 1", on_failure_callback=handle_task_failure)
    

    Note that both take a single argument (named context here).

    The output of handle_task_failure is shown in the task logs. The output of handle_dag_failure is a bit buried, you can find it in Airflow's logs folder /scheduler/YYYY-MM-DD/yourdagfile.py.log, or need to set up a log shipping mechanism.