airflowairflow-2.xairflow-api

Accessing task_instance or ti via simpleHttpOperator to do an xcom push


TLDR

In the python callable for a simpleHttpOperator response function, I am trying to push an xcom that has combined information from two sources to a specificied key (a hash of the filename/path and an object lookup from a DB)

Longer Tale

I have a filesensor written which grabs all new files and passes them to MultiDagRun to parallel process the information (scientific) in the files as xcom. Works great. The simpleHttpOperator POSTs filepath info to a submission api and receives back a task_id which it must then read as a response from another (slow running) api to get the result. This I all have working fine. Files get scanned, it launches multiple dags to process, and returns objects.

But... I cannot puzzle out how to push the result to an xcom inside the python response function for the simpleHttpOperator.

My google- and SO and Reddit-fu has failed me here (and it seems overkill to use the pythonOperator tho that's my next stop.). I notice a lot of people asking similar questions though.

How do you use context or ti or task_instance or context['task_instance'] with the response function? (I cannot use "Returned Value" xcom as I need to distinguish the xcom keys as parallel processing afaik). As the default I have context set to true in the default_args.

Sure I am missing something simple here, but stumped as to what it is (note, I did try the **kwargs and ti = kwargs['ti'] below as well before hitting SO...


def _handler_object_result(response, file):
    # Note: related to api I am calling not Airflow internal task ids
    header_result = response.json()
    task_id = header_result["task"]["id"]

    api = "https://redacted.com/api/task/result/{task_id}".format(task_id=task_id)
    resp = requests.get(api, verify=False).json()
    data = json.loads(resp["data"])
    file_object = json.dumps(data["OBJECT"])
    file_hash = hash(file)
    # This is the part that is not working as I am unsure how
    # to access the task instance to do the xcom_push
    ti.xcom_push(key=file_hash, value=file_object)
    if ti.xcom_pull(key=file_hash):
        return True
    else:
        return False

and the Operator:

    object_result = SimpleHttpOperator(
        task_id="object_result",
        method='POST',
        data=json.dumps({"file": "{{ dag_run.conf['file'] }}", "keyword": "object"}),
        http_conn_id="coma_api",
        endpoint="/api/v1/file/describe",
        headers={"Content-Type": "application/json"},
        extra_options={"verify":False},
        response_check=lambda response: _handler_object_result(response, "{{ dag_run.conf['file'] }}"),
        do_xcom_push=False,
        dag=dag,
    )

I was really expecting the task_instance object to be available in some fashion, either be default or configuration but each variation that has worked elsewhere (filesensor, pythonOperator, etc) hasn't worked, and been unable to google a solution for the magic words to make it accessible.


Solution

  • You can try using the get_current_context() function in your response_check function:

    from airflow.operators.python import get_current_context
    
    def _handler_object_result(response, file):
        # Note: related to api I am calling not Airflow internal task ids
        header_result = response.json()
        task_id = header_result["task"]["id"]
    
        api = "https://redacted.com/api/task/result/{task_id}".format(task_id=task_id)
        resp = requests.get(api, verify=False).json()
        data = json.loads(resp["data"])
        file_object = json.dumps(data["OBJECT"])
        file_hash = hash(file)
    
        ti = get_current_context()["ti"]  # <- Try this
        ti.xcom_push(key=file_hash, value=file_object)
        if ti.xcom_pull(key=file_hash):
            return True
        else:
            return False
    

    That function is a nice way of still accessing the task's execution context when context isn't explicitly handy or you don't want to pass context attrs around to access it deep in your logic stack.