google-bigqueryairflowairflow-api

Insert into BigQuery using parameters from web server request json data


In the google function I'm triggering a dag:

endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": {"file_name": file_name}}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

It is possible to obtain that data in python operator:

def printParams(**kwargs):
     for k, v in kwargs["params"].items():
         print(f"{k}:{v}")

task = PythonOperator(
        task_id="print_parameters",
        python_callable=printParams)

What is the way to pass file_name into the sql statement taken from the POST request?

   perform_analytics = BigQueryExecuteQueryOperator(
           task_id="perform_analytics",
           sql=f"""
    Select **passFileNameVariableFromJsonDataHere** as FileName
    FROM my_project.sample.table1
       """,
           destination_dataset_table=f"my_project.sample.table2",
           write_disposition="WRITE_APPEND",
           use_legacy_sql=False
       )

Is there a way avoid using xCom variables as it is a single task level operation?


Solution

  • I was able to read the value in sql query like this:

    sql=f"""
        Select "{{{{dag_run.conf.get('file_name')}}}}" as FileName
        FROM my_project.sample.table1
           """