pythonairflowpython-polarspolars

Airflow DAG gets stuck when filtering a Polars DataFrame


I am dynamically generating Airflow DAGs based on data from a Polars DataFrame. The DAG definition includes filtering this DataFrame at DAG creation time and again inside a task when the DAG runs.

However, when I run the dag and I attempt to filter the polars dataframe inside the dynamically generated DAG, the task gets stuck indefinitely after printing before filter, without raising an error. Just gets stuck and runs forever until an airflow exception is thrown on memory usage.

I am with airflow 2.7.3 version and polars 0.20.31 for what it is worth mentioning it.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl

def dag_constructor(name):
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2023, 1, 1),
        'retries': 1,
    }

    # Define the DAG
    dag = DAG(
        dag_id=f'{name}',
        default_args=default_args,
        description='A simple DAG to print Hello World',
        schedule_interval='@daily',
        catchup=False,
    )


    def print_hello():
        print("starting")

        df = pl.DataFrame({
            "key": ["A", "B", "A"],
            "branch": ["br1", "ooo", "br2"],
            "chain": ["ch1", "Y", "ch2"]
        })

        print(df)
        print("before filter") 
        chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
        print("after filter")
        print(chains)
        print("finish dag")


    hello_task = PythonOperator(
        task_id='print_hello',
        python_callable=print_hello,
        dag=dag,
    )

    hello_task

    return dag

df = pl.DataFrame({
        "key": ["A", "B", "A"],
        "branch": ["br1", "ooo", "br2"],
        "chain": ["ch1", "Y", "ch2"]
    })
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
## chains = ["ch1", "ch2"]  THIS WOULD WORK, AND WONT GET STUCK, if uncommenting and commenting previous line
for ch in chains:
    dag_my_id = f"aa__{str(ch)}"
    globals()[dag_my_id] = dag_constructor("aa__"+ch)


Solution

  • after even just importing polars in the main process, it doesn't work with how Airflow forks the child process. Even if you tell Polars to be single-threaded.

    What works in this case is to make the child task run in a separate process.

    Here's a code that worked for me:

    import sys
    import subprocess
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime
    import polars as pl
    
    # If the script is run with the "child" argument, execute the task logic in a fresh process.
    if len(sys.argv) > 1 and sys.argv[1] == "child":
        print("starting")
        
        df = pl.DataFrame({
            "key": ["A", "B", "A"],
            "branch": ["br1", "ooo", "br2"],
            "chain": ["ch1", "Y", "ch2"]
        })
        print(df)
        print("before filter")
        chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
        print("after filter")
        print(chains)
        print("finish dag")
        sys.exit(0)  # Exit after finishing child task logic
    
    # Regular DAG-generation code (global scope)
    def dag_constructor(name):
        default_args = {
            'owner': 'airflow',
            'start_date': datetime(2023, 1, 1),
            'retries': 1,
        }
    
        dag = DAG(
            dag_id=name,
            default_args=default_args,
            description='A simple DAG to print Hello World',
            schedule_interval='@daily',
            catchup=False,
        )
    
        def print_hello():
            # Instead of running Polars code directly here, spawn a fresh process
            subprocess.check_call([sys.executable, __file__, "child"])
            print("Child process executed.")
    
        hello_task = PythonOperator(
            task_id='print_hello',
            python_callable=print_hello,
            dag=dag,
        )
        return dag
    
    # Global data manipulation using Polars.
    # Note: This import/configuration uses the default global settings.
    df = pl.DataFrame({
        "key": ["A", "B", "A"],
        "branch": ["br1", "ooo", "br2"],
        "chain": ["ch1", "Y", "ch2"]
    })
    chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
    
    for ch in chains:
        dag_my_id = f"aa__{ch}"
        globals()[dag_my_id] = dag_constructor("aa__" + ch)