I am using Airflow on Kubernetes using git-sync to sync DAGs from a git repository. I can successfully import DAGs, but I'm seeing an issue where old changes are persisting in the Airflow UI alongside new changes.
Example:
In my remote repository I have the following file:
dags/dag_test.py
The contents of this are as follows:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner' :'airflow',
'depends_on_past': False,
'start_date' : datetime(2023, 1, 1),
'retries' : 0
}
dag = DAG(dag_id='my_dag3', default_args=default_args, catchup=False, schedule_interval='@once')
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> end
The DAG my_dag3
is picked up by Airflow and visible in the UI. The DAG can run successfully, so far so good.
Now, I push a change to my master branch to rename the dag_id
to become dag_id='my_dag4'
, no other changes to the file or the filename - this should effectively define a new DAG dag4
and remove dag3
.
When this change reaches the UI, dag3
continues to stay and can continue to be run for a short period of time. At the same time, you can see the replacement dag4
alongside.
Both DAGs can still be run.
Furthermore, when I look at the code of dag3
it now contains the code after the change, i.e. the code does not match the definition of the DAG:
Both DAGs continue to be visible for a short while, until eventually dag3
will disappear. Even though this problem occurs for a short period of time, it could lead to some nasty bugs from running outdated code.
Why does this happen and how can I ensure Airflow UI only displays the DAGs based on the present snapshot of the repository?
My deployment:
Airflow Chart: 8.6.1 https://github.com/airflow-helm/charts.
Airflow Image: 2.4.3-python3.8.
Running locally in minikube
Now, I push a change to my master branch to rename the dag_id to become dag_id='my_dag4', no other changes to the file or the filename - this should effectively define a new DAG dag4 and remove dag3.
Not true. Renaming existed dag_id will register the new dag but it will not delete the previous dag records. These records are still present in the database. If you want to remove them you need to click on the delete button on the previous dag_id to purge all associated records (dag runs, task instances etc...) Keep in mind that one might want to preserve previous dag records for history/audit/other reason.
Note that the way Airflow works is with seralized dags. This means that your code is synced into the DB and all Airflow services (including the webserver) see the database table. So from webserver perspective it see two records in the dag table. It does not know what changes you did to the python file.
In Airflow 2.5.0 parsing_cleanup_interval configuration was added to automatically deactivated stale DAGs.
How often (in seconds) to check for stale DAGs (DAGs which are no longer present in the expected files) which should be deactivated, as well as datasets that are no longer referenced and should be marked as orphaned.