airflowslack

Use SlackNotifier in Airflow within a callback function


SlackNotifier works fine when called directly from the callback however is not doing anything once I call it from within a defined callback function (need to execute it only when DAG is not triggered manually). The function itself works fine (e.g. logging works). Any ideas why pls?

I have tried to use send_slack_notification() with same results. What works is SlackWebhookOperator() however that would require considerable changes (fails with multiple ERRORs right now)

def dag_run_info(**context):
    dag_run = context['dag_run']
    run_type = dag_run.run_type
    return run_type


def on_failure_callback(context):

    run_type = context['task_instance'].xcom_pull(task_ids='get_dag_run_info')

    if run_type == 'manual':
        return SlackNotifier(slack_conn_id=SLACK_CONNECTION_ID, text=SLACK_MESSAGE, channel=SLACK_CHANNEL)
    else:
        logging.info(f'No Slack notification for failed DAG because run_type:{run_type}')


Solution

  • In the BaseNotifier implements the call protocol which makes the SlackNotifier instance invocable like a function.

    This explains this example usage

     BashOperator(
            task_id="mytask",
            on_failure_callback=SlackNotifier(
                slack_conn_id=SLACK_CONNECTION_ID, 
                text=SLACK_MESSAGE,
                channel=SLACK_CHANNEL
            ),
            bash_command="fail",
        )
    

    Since you need to send slack notifications depending on the run_type in your custom callback, you can invoke the notifier directly in your callback passing along the run context.

    def on_failure_callback(context):
        run_type = context['task_instance'].xcom_pull(task_ids='get_dag_run_info')
    
        if run_type == 'manual':
            notifier = SlackNotifier(
                slack_conn_id=SLACK_CONNECTION_ID, 
                text=SLACK_MESSAGE,
                channel=SLACK_CHANNEL
            )
            notifier(context)
        else:
            logging.info(
                'No Slack notification for failed DAG because run_type:%s',
                run_type)