airflow

Airflow - How to pass xcom variable into Python function


I need to reference a variable that's returned by a BashOperator. In my task_archive_s3_file, I need to get the filename from get_s3_file. The task simply prints {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} as a string instead of the value.

If I use the bash_command, the value prints correctly.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file

Solution

  • Templates like {{ ti.xcom_pull(...) }} can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields, template_fields_renderers and template_ext attributes of the PythonOperator and BashOperator.

    So op_kwargs/op_args can be used to pass templates to your Python operator:

    def func_archive_s3_file(s3_path_filename):
        archive(s3_path_filename)
    
    task_archive_s3_file = PythonOperator(
        task_id='archive_s3_file',
        dag=dag,
        python_callable=obj.func_archive_s3_file,
        op_kwargs={ 's3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })
    

    You can also pass arguments using op_args as a list of positional arguments.

    However in the case of fetching an XCom value, another alternative is just using the TaskInstance object made available to you via context:

    def func_archive_s3_file(**context):
        archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))
    
    task_archive_s3_file = PythonOperator(
        task_id='archive_s3_file',
        dag=dag,
        python_callable=obj.func_archive_s3_file,
        provide_context=True,