I have a ClickHouse operator in Airflow, and I want to execute queries on the database on a scheduled basis. I need the variable ds to be replaced in the query filter each time the job runs. here is my code:
default_args = {
'owner': 'nestor',
'depends_on_past': False,
'start_date': datetime(2024, 2, 26),
'email': ['test@gmail.com'],
'email_on_failure': False,
'execution_timeout': timedelta(minutes=15),
}
with DAG(
"agg_to_clickhouse",
schedule_interval=None,
default_args=default_args,
) as dag:
insert_task = ClickhouseOperator(
task_id="insert_task",
ch_connection_id="clickhouse_db_connection",
sql="""
INSERT INTO tmp.target_tbl
SELECT created_date,count() AS total_ride,
count(DISTINCT(passengerId)) AS total_passenger
FROM tmp.source_tbl
WHERE created_date >= '{{ds}}'
GROUP BY created_date
""",
dag=dag,
)
but it does not replace ds with value. I tried other syntaxes but it did not work.
like:
'{{{{ds}}}}'
{{{{ds}}}}
"{{ds}}"
I added this line of code to custom operator and it solved my problem:
template_fields: Sequence[str] = ("sql",)