airflow

Dynamically create n tasks in airflow from params


I want to dynamically create n taks, n should be defined by params, so i can define it when running the dag in the UI:

import pendulum
from airflow.decorators import dag, task

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["forest_hpo"],
    params={"n": 3}
)
def dynamic_test():

    @task()
    def model_training_task(i):
        print(i)

    for i in range(n): # <-- n should come from the params
        model_training_task(i)

dynamic_test()

Access "params" in body of dag is a similar question. However, the provied solution only shows how to use the params in a task, but not on DAG level...


Solution

  • Using Dynamic Task Mapping:

    import pendulum
    from airflow.decorators import dag, task
    
    @dag(
        schedule=None,
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        tags=["forest_hpo"],
        params={"n": 3}
    )
    def dynamic_test():
    
        @task()
        def get_list_from_params(params:dict):
            return list(range(params["n"]))
    
        @task()
        def model_training_task(i):
            print(i)
    
        model_training_task.expand(i=get_list_from_params())
    
    dynamic_test()
    

    A task that returns a list to iterate on is needed as far as I know. Result: enter image description here

    See how model_training_task was scheduled 3 times. You can inspect the log in Mapped Tasks.