My use case is simple: I have some processes that interacts with some tables in the database. I want to automate the tables creation, so I added a create_table_if_not_exists_task
. I only want to run that task in the first DAG run, but not in the following ones, since it's taking DAG time / resources that I could be using somewhere else.
My question is: do I have a clean way to do that in Airflow?
The idea I had is to have an Airflow variable updated with that information and check it in the DAG parsing. Don't like it since it creates a connection to the metadata database in each heartbeat.
You could use a ShortCircuitOperator
(or even BranchPythonOperator
depending on your pipeline) in front of the task you wish to control, access dag_run
object (either directly or through context
) in the Python callable, and then check for any previous DagRuns with the DagRun.get_previous_dagrun()
method. Something like this (untested though):
def has_previous_dagrun(dag_run):
return False if dag_run.get_previous_dagrun() is not None else True
...
check_if_first_dagrun = ShortCircuitOperator(
task_id="check_if_first_dagrun",
python_callable=has_previous_dagrun,
)