pythonairflow

Why function on_failure_callback for Airflow dag not running on task fail?


I got Airflow dag with on_failure_callback function. But on_failure_callback function not running on failure and I don't see logs.

There code of my dag.

All @tasks in this code works, there's no need to show special failing function. I work with task instance by adding **kwargs to functions (another options not working in my case).

Why send_message_on_dag_fail not works on then dag had failed?

I add same code to task get_start_end_dates just to be sure that send_message_on_dag_fail function is working.

Airflow 2.8.2, Pyhton 3.11.8.

import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pytz

import httpx

from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable

def get_current_period(date: datetime.date = None):
    tz = pytz.timezone('Europe/Moscow')
    if date:
        now = date.date()
    else:
        now = datetime.now(tz).date()
    if now.day <= 15:
        start_date = (now - relativedelta(months=1)).replace(day=16)
        end_date = now.replace(day=1) - relativedelta(days=1)
    else:
        start_date = now.replace(day=1)
        end_date = now.replace(day=15)
    return str(start_date), str(end_date)

def send_msg(bot_token: str, chat_id: str, message: str, type:str = 'message' or 'code'):
    if type == 'message':
        url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
        client = httpx.Client(base_url='https://')
        return client.post(url)
    elif type == 'code':
        url = f'https://api.telegram.org/bot{bot_token}/sendMessage'
        params = {
            'chat_id': chat_id,
            'text': message,
            'parse_mode': 'Markdown'
        }
        client = httpx.Client(base_url='https://')
        return client.post(url, params=params)

def get_xcom_from_context(context, task_id: str, dict_key:str = False):
    if dict_key:
        xcom = context['ti'].xcom_pull(task_ids=task_id)[dict_key]
    else:
        xcom = context['ti'].xcom_pull(task_ids=task_id)
    return xcom


default_args = {
    'owner': 'user',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'start_date': datetime(2024, 9, 19)
}

host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
server_host_name = Variable.get('server_host_name')
bearer_key = Variable.get('bearer_key')
user_key = Variable.get('user_key')
sales_plans_url = Variable.get('sales_plans_url')
specialization_prices_url = Variable.get('specialization_prices_url')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')

def send_message_on_dag_fail(bot_token = bot_token, chat_id = chat_id, **kwargs):
    context = kwargs
    log = context['ti'].log
    log.error('DAG FINISHED WITH ERROR __________________') # this error text easier to find

    task_id = context['ti'].task_id
    dag_id = context['dag'].dag_id
    message = f"Task {task_id} from Dag {dag_id} failed."
    log.error(message)
    send_msg(bot_token, chat_id, message, 'message')

@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4, on_failure_callback=send_message_on_dag_fail)
def dag_get_bonus_and_penaltys_for_staff():
    @task
    def check_time():
        tz = pytz.timezone('Europe/Moscow')
        current_time = datetime.now(tz).time()
        if current_time >= datetime.strptime("00:00", "%H:%M").time() and current_time <= datetime.strptime("01:00", "%H:%M").time():
            return False
        else:
            return True
        
    @task
    def get_start_end_dates(bot_token = bot_token, chat_id = chat_id,**kwargs):
        context = kwargs
        log = context['ti'].log
        check_time = get_xcom_from_context(context, 'check_time')
        log.info('Xcom objects pulled from context')
        if check_time:
            start_date, end_date = get_current_period()
            result = {
                'start_date': start_date
                , 'end_date': end_date
            }

            task_id = context['ti'].task_id
            dag_id = context['dag'].dag_id
            message = f"TASK {task_id} DAG {dag_id}."
            log.error(message)
            send_msg(bot_token, chat_id, message, 'message')

            return result
        else:
            raise AirflowSkipException("Time for database cleaning, skip DAG execution.")
        
    ...

    check_time_task = check_time()
    get_start_end_dates_task = get_start_end_dates()

    check_time_task >> get_start_end_dates_task >> ...

dag_get_bonus_and_penaltys_for_staff = dag_get_bonus_and_penaltys_for_staff()

Solution

  • I tried rewrite send_message_on_dag_fail like only with context as input, now it works.

    I don't know why, cause then I tries work with context in task with context (not **kwargs) I've failed.

    Any ideas why it works this way?

    def send_message_on_dag_fail(context):
        log = context['ti'].log
        log.error('DAG FINISHED WITH ERROR __________________') # this error text easier to find
    
        task_id = context['ti'].task_id
        dag_id = context['dag'].dag_id
        message = f"Task {task_id} from Dag {dag_id} failed."
        log.error(message)
        send_msg(bot_token, chat_id, message, 'message')