SlackNotifier works fine when called directly from the callback however is not doing anything once I call it from within a defined callback function (need to execute it only when DAG is not triggered manually). The function itself works fine (e.g. logging works). Any ideas why pls?
I have tried to use send_slack_notification() with same results. What works is SlackWebhookOperator() however that would require considerable changes (fails with multiple ERRORs right now)
def dag_run_info(**context):
dag_run = context['dag_run']
run_type = dag_run.run_type
return run_type
def on_failure_callback(context):
run_type = context['task_instance'].xcom_pull(task_ids='get_dag_run_info')
if run_type == 'manual':
return SlackNotifier(slack_conn_id=SLACK_CONNECTION_ID, text=SLACK_MESSAGE, channel=SLACK_CHANNEL)
else:
logging.info(f'No Slack notification for failed DAG because run_type:{run_type}')
In the BaseNotifier
implements the call protocol which makes the SlackNotifier
instance invocable like a function.
This explains this example usage
BashOperator(
task_id="mytask",
on_failure_callback=SlackNotifier(
slack_conn_id=SLACK_CONNECTION_ID,
text=SLACK_MESSAGE,
channel=SLACK_CHANNEL
),
bash_command="fail",
)
Since you need to send slack notifications depending on the run_type in your custom callback, you can invoke the notifier directly in your callback passing along the run context.
def on_failure_callback(context):
run_type = context['task_instance'].xcom_pull(task_ids='get_dag_run_info')
if run_type == 'manual':
notifier = SlackNotifier(
slack_conn_id=SLACK_CONNECTION_ID,
text=SLACK_MESSAGE,
channel=SLACK_CHANNEL
)
notifier(context)
else:
logging.info(
'No Slack notification for failed DAG because run_type:%s',
run_type)