google-cloud-platformairflowdirected-acyclic-graphs

Failed to create dag from function


I tried to put in an util module this function:

dag_start_date = pendulum.datetime(2022, 1, 1, tzinfo="Europe/Rome")

dag_default_args = {
    'start_date': dag_start_date,
    'provide_context': True,
}

def get_dag(
        dag_id,
        schedule_interval,
        default_args=None,
        catchup=False,
        max_active_runs=1,
        user_defined_filters=None,
        **kwargs
):
    if default_args is None:
        default_args = dag_default_args

    if user_defined_filters is None:
        user_defined_filters = {'localtz': localize_utc_tz}

    dag = models.DAG(
        dag_id,
        default_args=default_args,
        catchup=catchup,
        max_active_runs=max_active_runs,
        user_defined_filters=user_defined_filters,
        schedule_interval=schedule_interval,
        **kwargs
    )

    return dag

And tried to use like this:

with util.get_dag("my_dag", None) as my_dag:
    ...

but in airflow the dag did not appear. When I created the dag as usual, it worked.

Why my function does not work?

I use Airflow 1 on Google Cloud Platform.


Solution

  • When dynamically generating DAGs, the DAG object itself needs to be available in globals().

    You could try something like this when calling the get_dag function:

    my_dag = util.get_dag("my_dag", None)
    globals()[my_dag.dag_id] = my_dag
    
    with my_dag:
        ...
    

    P.S. As a maintainer of the Apache Airflow project, I would highly recommend upgrading to Airflow 2. Airflow 1 has been EOL since June 17, 2021. You are missing a lot of new features, UX and quality-of-life improvements, security patches, etc.