pythonpython-3.xairflowdirected-acyclic-graphsname-clash

Airflow Packaged Dags (zipped) clash when subfolders have same name


We're setting up an Airflow framework in which multiple data scientist teams can orchestrate their data processing pipelines. We've developed a Python code-base to help them implement the DAGs, which includes functions and classes (Operator sub-classes as well) in various packages and modules.

Every team will have their own DAG packaged in a ZIP file together with the functions and classes in packages. For example first ZIP file would contain

ZIP1:

main_dag_teamA.py

subfolder1: package1-with-generic-functions + init.py

subfolder2: package2-with-generic-operators + init.py

And another ZIP file would contain

ZIP2:

main_dag_teamB.py

subfolder1: package1-with-generic-functions + init.py

subfolder2: package2-with-generic-operators + init.py

Please note that in both ZIP files subfolder1 and subfolder2 will usually be exactly the same, meaning exact same files with same functions and classes. But in time, when new versions of packages will become available, the package contents will start deviating across the DAG packages.

With this setup we bump into the following problem: it seems that Airflow is not handling the same-name packages very well when contents of packages/subfolders start deviating across the ZIPs. Because when I run "airflow list_dags" it shows errors like:

File "/data/share/airflow/dags/program1/program1.zip/program1.py", line 1, in > from subfolder1.functions1 import function1 ImportError: No module named 'subfolder1.functions1'

Problem can be reproduced with following code, where two small DAGs are in their ZIP files together with package my_functions, which has the same name, but different content.

DAG package ZIP 1:

program1.py

from my_functions.functions1 import function1

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)

my_functions/functions1.py:

def function1():
    print('function1')

DAG package ZIP 2:

program2.py:

from my_functions.functions2 import function2

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)

my_functions/functions2.py:

def function2():
    print('function2')

With these two ZIP files when I run "airflow list_dags" it shows an error:

File "/data/share/airflow/dags/program1/program1.zip/program1.py", line 1, in from subfolder1.functions1 import function1 ImportError: No module named 'subfolder1.functions1'

When the contents of the subfolders in the ZIPs are the same, no error occurs.

My question: how can I prevent this clash of subfolders in ZIPs? I really would like to have fully code independent DAGs, with their own version of packages.


Solution

  • Solved by doing following at top of the DAGs (program1.py and program2.py), before the

    from my_functions.functions1 import function1
    

    and

    from my_functions.functions2 import function2
    

    Code:

    import sys
    
    # Cleanup up the already imported function module
    cleanup_mods = []
    for mod in sys.modules:
        if mod.startswith("function"):
            cleanup_mods.append(mod)
    for mod in cleanup_mods:
        del sys.modules[mod]
    

    This makes sure that every parse of a DAG, the imported libraries are cleaned.