pythonairflow

Trigger airflow DAG manually with parameter and pass them into Python function


I want to pass parameters into airflow DAG and use them in python function. I can use the parameter into bash operator, but I can’t find any reference to use them as python function.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago

#Define DAG
dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))

#Parameter
owner="{{ dag_run.conf['owner'] }}"
table="{{ dag_run.conf['table'] }}"

run_this="echo "+owner+"."+table

def test_func(owner,table):
    print(owner+"."+table)

task1 = BashOperator(
    task_id='test_task1',
    bash_command=run_this,
    dag=dag,
    queue='cdp_node53',
) 

task2 = PythonOperator(
    task_id='test_task2',
   python_callable=test_func(owner,table),
    dag=dag,
    queue='cdp_node53',
) 

I want to pass below as parameters while trigger DAG. "task1" works fine for me. I need to make "task2" workable. Please guide me to correct the above code so that I can pass parameters into it.

{"owner":"test_owner","table":"test_table"}

Solution

  • For passing arguments into the PythonOperator you should use either op_args (for positional arguments) or op_kwargs (for keyword arguments). Both parameters are also template fields so the values can be Jinja expressions as well.

    Refactoring your code using op_kwargs:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils.dates import days_ago
    
    
    #Define DAG
    dag = DAG("test_backup", schedule_interval=None, start_date=days_ago(1))
    
    #Parameter
    owner="{{ dag_run.conf['owner'] }}"
    table="{{ dag_run.conf['table'] }}"
    
    run_this="echo "+owner+"."+table
    
    def test_func(owner,table):
        print(owner+"."+table)
    
    task1 = BashOperator(
        task_id='test_task1',
        bash_command=run_this,
        dag=dag,
        queue='cdp_node53',
    )
    
    task2 = PythonOperator(
        task_id='test_task2',
        python_callable=test_func,
        op_kwargs={"owner": owner, "table": table},
        dag=dag,
        queue='cdp_node53',
    )
    

    Both tasks will log the INFO - test_owner.test_table now.