I got Airflow dag with on_failure_callback function. But on_failure_callback function not running on failure and I don't see logs.
There code of my dag.
All @tasks in this code works, there's no need to show special failing function. I work with task instance by adding **kwargs to functions (another options not working in my case).
Why send_message_on_dag_fail not works on then dag had failed?
I add same code to task get_start_end_dates just to be sure that send_message_on_dag_fail function is working.
Airflow 2.8.2, Pyhton 3.11.8.
import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pytz
import httpx
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable
def get_current_period(date: datetime.date = None):
tz = pytz.timezone('Europe/Moscow')
if date:
now = date.date()
else:
now = datetime.now(tz).date()
if now.day <= 15:
start_date = (now - relativedelta(months=1)).replace(day=16)
end_date = now.replace(day=1) - relativedelta(days=1)
else:
start_date = now.replace(day=1)
end_date = now.replace(day=15)
return str(start_date), str(end_date)
def send_msg(bot_token: str, chat_id: str, message: str, type:str = 'message' or 'code'):
if type == 'message':
url = f'https://api.telegram.org/bot{bot_token}/sendMessage?chat_id={chat_id}&text={message}'
client = httpx.Client(base_url='https://')
return client.post(url)
elif type == 'code':
url = f'https://api.telegram.org/bot{bot_token}/sendMessage'
params = {
'chat_id': chat_id,
'text': message,
'parse_mode': 'Markdown'
}
client = httpx.Client(base_url='https://')
return client.post(url, params=params)
def get_xcom_from_context(context, task_id: str, dict_key:str = False):
if dict_key:
xcom = context['ti'].xcom_pull(task_ids=task_id)[dict_key]
else:
xcom = context['ti'].xcom_pull(task_ids=task_id)
return xcom
default_args = {
'owner': 'user',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2024, 9, 19)
}
host = Variable.get('host')
database_name = Variable.get('database_name')
user_name = Variable.get('user_name')
password_for_db = Variable.get('password_for_db')
server_host_name = Variable.get('server_host_name')
bearer_key = Variable.get('bearer_key')
user_key = Variable.get('user_key')
sales_plans_url = Variable.get('sales_plans_url')
specialization_prices_url = Variable.get('specialization_prices_url')
bot_token = Variable.get('bot_token')
chat_id = Variable.get('chat_id')
def send_message_on_dag_fail(bot_token = bot_token, chat_id = chat_id, **kwargs):
context = kwargs
log = context['ti'].log
log.error('DAG FINISHED WITH ERROR __________________') # this error text easier to find
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"Task {task_id} from Dag {dag_id} failed."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
@dag(default_args=default_args, schedule_interval=None, catchup=False, concurrency=4, on_failure_callback=send_message_on_dag_fail)
def dag_get_bonus_and_penaltys_for_staff():
@task
def check_time():
tz = pytz.timezone('Europe/Moscow')
current_time = datetime.now(tz).time()
if current_time >= datetime.strptime("00:00", "%H:%M").time() and current_time <= datetime.strptime("01:00", "%H:%M").time():
return False
else:
return True
@task
def get_start_end_dates(bot_token = bot_token, chat_id = chat_id,**kwargs):
context = kwargs
log = context['ti'].log
check_time = get_xcom_from_context(context, 'check_time')
log.info('Xcom objects pulled from context')
if check_time:
start_date, end_date = get_current_period()
result = {
'start_date': start_date
, 'end_date': end_date
}
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"TASK {task_id} DAG {dag_id}."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')
return result
else:
raise AirflowSkipException("Time for database cleaning, skip DAG execution.")
...
check_time_task = check_time()
get_start_end_dates_task = get_start_end_dates()
check_time_task >> get_start_end_dates_task >> ...
dag_get_bonus_and_penaltys_for_staff = dag_get_bonus_and_penaltys_for_staff()
I tried rewrite send_message_on_dag_fail like only with context as input, now it works.
I don't know why, cause then I tries work with context in task with context (not **kwargs) I've failed.
Any ideas why it works this way?
def send_message_on_dag_fail(context):
log = context['ti'].log
log.error('DAG FINISHED WITH ERROR __________________') # this error text easier to find
task_id = context['ti'].task_id
dag_id = context['dag'].dag_id
message = f"Task {task_id} from Dag {dag_id} failed."
log.error(message)
send_msg(bot_token, chat_id, message, 'message')