So I am trying to pass arguments to op_args of PythonVirtualenvOperator using xcom_pull. But I am not quite sure how to do it properly. Any help would be highly appreciated.
I am not sure how to provide more details, but stackoverflow doesn't allow me to create post otherwise.So please ignore this line
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonVirtualenvOperator, PythonOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base import BaseHook
from datetime import datetime, timedelta
from test_process import process_test_data
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'catchup':False
}
connection = BaseHook.get_connection('test_flow')
db_user = connection.login
db_pass = connection.password
with DAG(
'test',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1)
) as dag:
process_test_data_task = PythonVirtualenvOperator(
task_id = 'process_test_data',
python_callable = process_test_data,
op_args = [db_user, db_pass, ti.xcom_pull(key='db_host'), ti.xcom_pull(key='db_database')],
python_version=3.8,
system_site_packages=True,
requirements = ['psycopg2', 'configparser'],
)
process_test_data_task
I have faced a similar issue and was able to pass arguments using jinja templates.
If the values you want to pass are constant, you should follow ozs's advice and store them as Variables. In my case they were not, so after pushing the values to XCOM I needed to pass it to the virtual env. In your code it would be something like:
process_test_data_task = PythonVirtualenvOperator(
task_id = 'process_test_data',
python_callable = process_test_data,
op_args = [
db_user,
db_pass,
'{{ ti.xcom_pull(key='db_host') }}',
'{{ ti.xcom_pull(key='db_database') }}'
],
python_version=3.8,
system_site_packages=True,
requirements = ['psycopg2', 'configparser'],
)
Careful: as far as I could tell, templated args will always be converted to strings within the PythonVirtualenvOperator.