I have been using Airflow successfully to read a SQL file from a bucket and pass it to a BigQueryInsertJobOperator
. Here is a minimal example to show my currently working setup:
with DAG(
"my_dag",
blah_args,
) as dag:
def get_query(file_name):
with open("/home/airflow/gcs/data/my_sql_file_bucket/" + file_name) as f:
f_query = f.read()
return f_query
op1 = BigQueryInsertJobOperator(
blah_args=blah_args,
configuration={
"query":{
"query": get_query("my_sql_file.sql"),
blah_all_the_rest
},
}
)
Ok hopefully you get this idea, this works perfectly! But now I need to dynamically change the bucket I read from based on a variable but I cannot figure out how to get it working:
def get_query(file_name):
with open("/home/airflow/gcs/data/{{ var.value.my_dynamic_bucket }}" + file_name) as f:
f_query = f.read()
return f_query
doesn't render and I understand why, it needs to be in a task. So then I tried:
op1 = BigQueryInsertJobOperator(
blah_args=blah_args,
configuration={
"query":{
"query": open({{ var.value.my_dynamic_bucket }}).read(),
blah_all_the_rest
},
}
)
but I suppose this is the same error as before. I even tried setting the template_searchpath
to have the value of {{ var.value.my_dynamic_bucket }}
in the default_args
but it won't render here either.
How can I use an Airflow variable to determine the name of the bucket I need to read my SQL files from? Thank you in advance.
if using a global scope variable is OK with your DAG then you can try using Airflow Variables https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html
Define a Variable say bucket_name with relevant value.
from airflow.models import Variable
# inside your function
Variable.get("bucket_name")
you can also update the Variable value if needed from say another DAG or just using the UI.
Variable.update("bucket_name","new_bucket")