airflowairflow-xcom

How to pass XCOM values to PythonVirtualenvOperator op_args?


So I am trying to pass arguments to op_args of PythonVirtualenvOperator using xcom_pull. But I am not quite sure how to do it properly. Any help would be highly appreciated.

I am not sure how to provide more details, but stackoverflow doesn't allow me to create post otherwise.So please ignore this line

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base import BaseHook
from datetime import datetime, timedelta

from test_process import process_test_data


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'catchup':False    
}

connection = BaseHook.get_connection('test_flow')
db_user = connection.login
db_pass = connection.password


    
with DAG(
    'test',
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(1)  
) as dag:

    
    
    process_test_data_task = PythonVirtualenvOperator(
        task_id = 'process_test_data',
        python_callable = process_test_data,
        op_args = [db_user, db_pass, ti.xcom_pull(key='db_host'), ti.xcom_pull(key='db_database')],
        python_version=3.8,
        system_site_packages=True,
        requirements = ['psycopg2', 'configparser'],
    )

    process_test_data_task

Solution

  • I have faced a similar issue and was able to pass arguments using jinja templates.

    If the values you want to pass are constant, you should follow ozs's advice and store them as Variables. In my case they were not, so after pushing the values to XCOM I needed to pass it to the virtual env. In your code it would be something like:

            process_test_data_task = PythonVirtualenvOperator(
                task_id = 'process_test_data',
                python_callable = process_test_data,
                op_args = [
                    db_user, 
                    db_pass, 
                    '{{ ti.xcom_pull(key='db_host') }}', 
                    '{{ ti.xcom_pull(key='db_database') }}'
                ],
                python_version=3.8,
                system_site_packages=True,
                requirements = ['psycopg2', 'configparser'],
            )
    

    Careful: as far as I could tell, templated args will always be converted to strings within the PythonVirtualenvOperator.