pythonairflowsendgriddirected-acyclic-graphs

Problem sending email with template_id with Airflow and Sendgrid


I am trying to send an email with Sendgrid and Airflow (2.4.2). I am able to send the email, but when I try to pass a template_id, the email is sent without the template design. It looks like I am passing the template_id in the incorrect way. How can I send the email with the template?.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.sendgrid.utils.emailer import send_email

from airflow.operators.email import EmailOperator

from datetime import datetime


default_args = {
    'owner': 'your_name',
    'start_date': datetime(2023, 10, 16, 15, 35, 0),
    'retries': 1,
}

#Task to send an email using SendGrid
def _send_email():

    postgres_hook = PostgresHook(postgres_conn_id="postgres")
    sql_query = "select email from Users where firstname='Raghav';"
    results = postgres_hook.get_records(sql_query)

    for row in results:
        email = row
        email_content = f"Hello  Email: {email}\n"

        send_email(
            to=email,
            subject='Test Email Subject',
            html_content=email_content,
            files=None,  # List of file paths to attach to the email
            cc=None,  # List of email addresses to CC
            bcc=None,  # List of email addresses to BCC
            conn_id='sendgrid_default',
            template_id='d-8cefbe24e73842d7810550fc441aceb0', #didn't work
            kwargs={
                'template_id':'d-8cefbe24e99942d7810550fc441aceb0'
            }
        )

with DAG('user_processing',
        start_date=datetime(2022,1,1),
        schedule_interval='@daily',
        catchup=False) as dag:

    email_user = PythonOperator(
        task_id = 'email_user',
        python_callable= _send_email
    )

    email_user

Solution

  • The solution is using a PythonOperator, sending a request to v3 using personalization.

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime
    import requests
    
    headers = {
            "Authorization": f"Bearer {sendgrid_api_key}",
            "Content-Type": "application/json",
    }
    
    data = {
        "personalizations": [
            {
                "to": [
                    {
                        "email": "recipient1@example.com",
                        "name": "Recipient 1 Name"
                    },
                    {
                        "email": "recipient2@example.com",
                        "name": "Recipient 2 Name"
                    }
                ],
                "subject": "Your Email Subject"
            }
        ],
        "from": {
            "email": "sender@example.com",
            "name": "Sender Name"
        },
        "content": [
            {
                "type": "text/plain",
                "value": "Hello, this is the plain text content of the email."
            },
            {
                "type": "text/html",
                "value": "<p>Hello, this is the HTML content of the email.</p>"
            }
        ]
    }
    
    
    requests.post("https://api.sendgrid.com/v3/mail/send", json=data, headers=headers)