pythonairflowsensors

Airflow- how to send out email alert when sensor is set to soft fail


In the dag below, sensor A is set to soft_fail = True, because I'd like to skip B and C if A fails. The problem is I'd still like to get an email alert when A fails. But when soft_fail is true, A is marked as success when the sensor doesn't detect anything, and no email alert would be sent out. Could someone please help to point out how to achieve this? Many thanks.

A (sensor, soft_fail = True) >> B >> C


Solution

  • Airflow sensor is marked as skipped (not success) when it fails and soft_fail is True.

    There is no option to add email on skip not a callback. But you can create a new task from the operator EmailOperator, which run when the sensor A is marked as skipped. Unfortunately, there is no trigger rule to run a task when upstream is skipped, but you can create a new operator which check the state of A and send the email based on it.

    from airflow.operators.email import EmailOperator
    from airflow.utils.context import Context
    from airflow.utils.state import TaskInstanceState
    from airflow.utils.trigger_rule import TriggerRule
    
    
    class MyNotifier(EmailOperator):
        def __int__(self, monitor_task_id: str, notify_on_state: str, *args, **kwargs):
            self.monitor_task_id = monitor_task_id
            self.notify_on_state = notify_on_state
            super().__init__(*args, **kwargs)
    
        def execute(self, context: Context):
            task_to_check = context["dag_run"].get_task_instance(task_id=self.monitor_task_id)
            if task_to_check.state == self.notify_on_state:
                super().execute(context)
    
    
    notification_task = MyNotifier(
        task_id="sensor_skip_notifier",
        monitor_task_id="A",
        trigger_rule=TriggerRule.ALL_DONE,  # to run the task when A is done regardless the state
        notify_on_state=TaskInstanceState.SKIPPED,
        to="<email>",
        subject="<subject>",
        html_content="<content>",  # you can use jinja to add run info
    )
    
    A >> notification_task