pythonairflowgoogle-cloud-composer

Airflow DAG - access context using SimpleHttpOperator to enable XCOM pull


I am struggling to pull xcoms into a task using the SimpleHttpOperator.

The below dag is meant to orchestrate a number of requests (via Google Cloud Functions) made to a third party API, store the csvs in Storage, and eventually access all the csvs, merge together, transform and store in Big query.

The tasks are in pairs, for each report, the first task triggers the Cloud function the generates the request and stores the report token in Secret Manager, and the second task checks to see if the report is available to download, retrying until it is, and then saving it to Google Cloud Storage.

Once all the CSVs are available, the last task will trigger another cloud function that downloads all csvs from storage, merge and upload to BQ.

When each indivual download is complete, I'm using the response_filter arg of the SimpleHttpOperator to make the filename available for later use as an xcom.

# Python standard modules
from datetime import datetime, timedelta# Airflow modules
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import json

default_args = {
    'owner': '--',
    'depends_on_past': False,
    # Start on 27th of June, 2020
    'start_date': datetime(2021, 6, 16),
    'email': ['--'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(seconds=60),
    'provide_context': True    
}

with DAG(
    "dailymotion_reporting",
    default_args=default_args,
    schedule_interval='0 6 * * *',
    tags=["my_dags"]
) as dag:    
    
    

    def push_xcom(**context):
        v = context['ti'].xcom_push(key="filename", value=response.json()["filename"])
        return v

    def response_check(response):
        if response[2] == "report not ready":
            print("report not ready: " + report_summary)
            return False
        elif response[2] == "report downloaded":
            print("report downloaded: " + report_summary)
            return True

    #t1 as request first report
    report1_request = SimpleHttpOperator(
        task_id= "report1_request",
        method='POST',
        http_conn_id='report_request_trigger',
        endpoint='request_dm_report',
        data=json.dumps({
                "dimensions": "DAY-VIDEO_ID-VIDEO_OWNER_CHANNEL_SLUG-VISITOR_DOMAIN_GROUP-VISITOR_SUBDOMAIN-VISITOR_DEVICE_TYPE-INVENTORY_POSITION", 
                "metrics": "TOTAL_INVENTORY", 
                "product": "EMBED"
        }),
        headers={"Content-Type": "application/json"}
    )
    #t2 check report availabilty until available then download
    report1_check_dl = SimpleHttpOperator(
        task_id= "report1_check_dl",
        method='GET',
        http_conn_id='report_request_trigger',
        endpoint='check_previously_requested_dm_reports',
        response_check = lambda response: True if response.json()["report_status"] == "report downloaded" else False,
        response_filter = lambda response: {"filename": response.json()["filename"]}
    )

The task that is meant to pull the csvs out of storage is below. I am trying to retrieve the filenames from the xcoms produced by the previous tasks and include them in the data payload for my cloud function.

ad_report_transformations = SimpleHttpOperator(
    task_id= "ad_report_transformations",
    method='POST',
    http_conn_id='report_request_trigger',
    endpoint='dm_transform_ad_data',
    data = json.dumps(" {{ context['ti'].xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }} "),
    response_check = lambda response: True if response == "ok" else False
)

However, having tried many different methods, I keep getting variations of the same error

{taskinstance.py:1152} ERROR - 'context' is undefined

What's the best way for me to define the context using SimpleHttpOperator? Or is there another way to pull those values in? Most of the solutions I have seen with similar issues use the pythonOperator which has a provide_context arg which seems to enable the above, but I wanted to see if there was a way for me to do this without having to rewrite all my tasks as functions. Thanks


Solution

  • When retrieving XCom in Jinja templates, you don't need the context object since the context object is passed to render the template value behind the scenes. Try this:

    data="{{ ti.xcom_pull(task_ids=['report1_check_dl', 'report2_check_dl','report3_check_dl', 'report4_check_dl', 'report5_check_dl'], key=return_value.json()['filename']) }}"
    

    The above assumes you don't actually need json.dumps().