I am dynamically generating Airflow DAGs based on data from a Polars DataFrame. The DAG definition includes filtering this DataFrame at DAG creation time and again inside a task when the DAG runs.
However, when I run the dag and I attempt to filter the polars dataframe inside the dynamically generated DAG, the task gets stuck indefinitely after printing before filter
, without raising an error. Just gets stuck and runs forever until an airflow exception is thrown on memory usage.
I am with airflow 2.7.3 version and polars 0.20.31 for what it is worth mentioning it.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl
def dag_constructor(name):
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# Define the DAG
dag = DAG(
dag_id=f'{name}',
default_args=default_args,
description='A simple DAG to print Hello World',
schedule_interval='@daily',
catchup=False,
)
def print_hello():
print("starting")
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
print(df)
print("before filter")
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
print("after filter")
print(chains)
print("finish dag")
hello_task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
hello_task
return dag
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
## chains = ["ch1", "ch2"] THIS WOULD WORK, AND WONT GET STUCK, if uncommenting and commenting previous line
for ch in chains:
dag_my_id = f"aa__{str(ch)}"
globals()[dag_my_id] = dag_constructor("aa__"+ch)
after even just importing polars
in the main process, it doesn't work with how Airflow forks the child process. Even if you tell Polars to be single-threaded.
What works in this case is to make the child task run in a separate process.
Here's a code that worked for me:
import sys
import subprocess
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl
# If the script is run with the "child" argument, execute the task logic in a fresh process.
if len(sys.argv) > 1 and sys.argv[1] == "child":
print("starting")
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
print(df)
print("before filter")
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
print("after filter")
print(chains)
print("finish dag")
sys.exit(0) # Exit after finishing child task logic
# Regular DAG-generation code (global scope)
def dag_constructor(name):
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG(
dag_id=name,
default_args=default_args,
description='A simple DAG to print Hello World',
schedule_interval='@daily',
catchup=False,
)
def print_hello():
# Instead of running Polars code directly here, spawn a fresh process
subprocess.check_call([sys.executable, __file__, "child"])
print("Child process executed.")
hello_task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
return dag
# Global data manipulation using Polars.
# Note: This import/configuration uses the default global settings.
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
for ch in chains:
dag_my_id = f"aa__{ch}"
globals()[dag_my_id] = dag_constructor("aa__" + ch)