airflow

use airflow Template variable in python string


I have a ClickHouse operator in Airflow, and I want to execute queries on the database on a scheduled basis. I need the variable ds to be replaced in the query filter each time the job runs. here is my code:

default_args = {
'owner': 'nestor',
'depends_on_past': False,
'start_date': datetime(2024, 2, 26),
'email': ['test@gmail.com'],
'email_on_failure': False,
'execution_timeout': timedelta(minutes=15),
}

with DAG(
"agg_to_clickhouse",
schedule_interval=None,
default_args=default_args,
) as dag:
    insert_task = ClickhouseOperator(
        task_id="insert_task",
        ch_connection_id="clickhouse_db_connection",
        sql="""
               INSERT INTO tmp.target_tbl
                SELECT created_date,count() AS total_ride,
                count(DISTINCT(passengerId)) AS total_passenger 
                FROM tmp.source_tbl
                WHERE created_date >= '{{ds}}'
                GROUP BY created_date
                """,
        dag=dag,
    )

but it does not replace ds with value. I tried other syntaxes but it did not work.

like:

'{{{{ds}}}}'

{{{{ds}}}}

"{{ds}}"


Solution

  • I added this line of code to custom operator and it solved my problem:

    template_fields: Sequence[str] = ("sql",)