python-3.xnotificationsairflowworkflowmicrosoft-teams

How to send Airflow Failed Notifications to Microsoft Teams using Microsoft 365 Worfklow


I've got an Airflow function who send failed notification task in Teams group. With new Microsoft politics my old webhook call will not function anymore in few months so i want to work right now to adapt the code. There is my current code :

from airflow.models import Variable
import logging
import requests

def failed_task_notify_teams(context):
    logging.info("Send notification to the Teams Group")
    payload = {
        "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "title": "Airflow Task Error",
        "summary": f"Task {context['task_instance_key_str']} : Failed",
        "themeColor": "F44336",
        "sections": [
            {
                "activityTitle": f"Task {context['task_instance_key_str']} : Failed",
                "activitySubtitle": f"DAG: {context['dag'].dag_id}",
                "facts": [
                    {
                        "name": "Date",
                        "value": context['ds']
                    },
                    {
                        "name": "Log URL",
                        "value": context['task_instance'].log_url
                    }
                ]
            }
        ],
        "potentialAction": [{
            "@type": "OpenUri",
            "name": "See Logs",
            "targets": [{
                "os": "default",
                "uri": context['task_instance'].log_url
            }]
        }]
    }
    
    headers = {"content-type": "application/json"}
    requests.post(Variable.get('teams_webhook_secret'), json=payload, headers=headers)
    logging.info("Teams notification sent to the group!")

In my mind the code should looking pretty the same but this one not working right now with Microsoft 365 Workflow.


Solution

  • After looking bit futher i was able to find a solution. The code looking effectively pretty same, I only need to change the payload "parameters". There is the code working with modifications :

    from airflow.models import Variable
    import logging
    import requests
    
    def failed_task_notify_teams(context):
        logging.info("Send notification to the Teams Group")
        payload = {
            "type": "message",
            "attachments" : [{
                "contentType": "application/vdn.microsoft.card.adaptive",
                "contentUrl": None,
                "content": {
                    "$schema": "http://adaptivecard.io/schemas/adaptive-card.json",
                    "type": "AdaptiveCard",
                    "version": "1.2",
                    "body": [
                        {
                            "type": "TextBlock",
                            "text": f"Task {context['task_instance_key_str']} : Failed",
                            "weight": "bolder",
                            "size": "medium",
                            "color": "attention",
                        },
                        {
                            "type": "TextBlock",
                            "text": f"DAG : {context['dag'].dag_id}",
                            "spacing": "none"
                        },
                        {
                            "type": "FactSet",
                            "facts": [
                                {
                                    "title": "Date :",
                                    "value": context['ds']
                                },
                                {
                                    "title": "Log URL :",
                                    "value": f"[Click here]({context['task_instance'].log_url})"
                                }
                            ]
                        }
                    ],
                    "actions": [
                        {
                            "type": "Action.OpenUrl",
                            "title": "See logs",
                            "url": context['task_instance'].log_url
                        }
                    ]
                }
            }]
        }
        
        headers = {"content-type": "application/json"}
        response = requests.post(Variable.get('teams_azure_webook_secret'), json=payload, headers=headers)
    
        if response.status_code == 200:
            logging.info("Notification sent to Teams Group")
        else:
            logging.error(f"Failed to sent notification to Teams Group: {response.status_code} - {response.text}")