pythonairflow

External files in Airflow DAG


I'm trying to access external files in a Airflow Task to read some sql, and I'm getting "file not found". Has anyone come across this?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

The log state the following:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

I understand that I could simply copy and paste the query inside the same file, it's really not at neat solution. There are multiple queries and the text is really big, embed it with the Python code would compromise readability.


Solution

  • Here is an example use Variable to make it easy.

    DAG code:

    import airflow
    from airflow.models import Variable
    
    tmpl_search_path = Variable.get("sql_path")
    
    dag = airflow.DAG(
       'tutorial',
        schedule_interval="@daily",
        template_searchpath=tmpl_search_path,  # this
        default_args=default_args
    )