pythongoogle-bigqueryairflowdirected-acyclic-graphs

Is there a way to parametrize BigQueryOperators in a Dag with Airflow?


So my goal is to create a Dag with BigQueryOperators that I can send in Airflow with a parametrized destination table in my SQL. I checked a lot of topics about how to send parameters to PythonOperators in order to call them with a --conf in Airflow but I don't know how to apply the same way to an argument of a BigQueryOperators.

My dag.py looks like this :


import airflow
import blabla..
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(
    "TestPython",
    schedule_interval=None,
    default_args=default_args,
    max_active_runs=1,
    catchup=False,
) as dag:


    stepOne = BigQueryOperator(
        task_id="stepOne",
        sql="SELECT * FROM `testTable` ",
        destination_dataset_table=" **variableTable** ",
        write_disposition="WRITE_TRUNCATE",
        use_legacy_sql=False,
    )

    stepOne

I wanted to know if there is a way to set the destination table name with an airflow trigger_dag command or maybe something else ( and of course while having a default value when it is not set so it can still be uploaded in my Dag bucket )

If something is not clear, I am available for more details and ways I tried to do it.


Solution

  • Yes, you can pass a run-time value to the "destination_dataset_table" because it is a templated field.

    For example:

    my_suffix = "{{ macros.ds_format(macros.ds_add(ds, -2), "%Y-%m-%d", "%Y%m%d") }}"
    stepOne = BigQueryOperator(
        task_id="stepOne",
        sql="SELECT * FROM `testTable` ",
        destination_dataset_table=f"project_id.dataset_id.table_prefix_{my_suffix}",
        write_disposition="WRITE_TRUNCATE",
        use_legacy_sql=False,
    )
    

    In my example I make the table name change using an Airflow macro to manipulate dates but you can use many others like XCOM:

    "{{ task_instance.xcom_pull(task_ids='task_id', key='return_value') }}"
    

    For your specific use-case, I think that this answer should work.

    You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run.conf["key"] }}" in templated field.