airflowairflow-scheduler

Dynamic dags not getting added by scheduler


I am trying to create Dynamic DAGs and then get them to the scheduler. I tried the reference from https://www.astronomer.io/guides/dynamically-generating-dags/ which works well. I changed it a bit as in the below code. Need help in debugging the issue.

I tried 1. Test run the file. The Dag gets executed and the globals() is printing all the DAGs objects. But somehow not listing in the list_dags or in the UI

from datetime import datetime, timedelta
import requests
import json
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator

def create_dag(dag_id,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval="@hourly",
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


def fetch_new_dags(**kwargs):

    for n in range(1, 10):
        print("=====================START=========\n")
        dag_id = "abcd_" + str(n) 
        print (dag_id)
        print("\n")
        globals()[dag_id] = create_dag(dag_id, n, default_args)
        print(globals())

default_args = {
    'owner': 'diablo_admin',
    'depends_on_past': False,
    'start_date': datetime(2019, 8, 8),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'trigger_rule': 'none_skipped'
    #'schedule_interval': '0 * * * *'
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('testDynDags', default_args=default_args, schedule_interval='*/1 * * * *')
#schedule_interval='*/1 * * * *'

check_for_dags = PythonOperator(dag=dag,
                   task_id='tst_dyn_dag',
                   provide_context=True,
                   python_callable=fetch_new_dags
                   )




check_for_dags

Expected to create 10 DAGs dynamically and added to the scheduler.


Solution

  • I guess doing the following would fix it


    Explanation

    So clearly, wrapping dag-generation code inside an Airflow task itself makes no sense.


    UPDATE-1

    From what is indicated in comments, I infer that the requirement dictates that you revise your external source that feeds inputs (how many dags or tasks to create) to your DAG / task-generation script. While this is indeed a complex use-case, but a simple way to achieve this is to create 2 separate DAGs.

    You can take inspiration from the Adding DAGs based on Variable value section