I want to dynamically create n taks, n should be defined by params, so i can define it when running the dag in the UI:
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["forest_hpo"],
params={"n": 3}
)
def dynamic_test():
@task()
def model_training_task(i):
print(i)
for i in range(n): # <-- n should come from the params
model_training_task(i)
dynamic_test()
Access "params" in body of dag is a similar question. However, the provied solution only shows how to use the params in a task, but not on DAG level...
Using Dynamic Task Mapping:
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["forest_hpo"],
params={"n": 3}
)
def dynamic_test():
@task()
def get_list_from_params(params:dict):
return list(range(params["n"]))
@task()
def model_training_task(i):
print(i)
model_training_task.expand(i=get_list_from_params())
dynamic_test()
A task that returns a list to iterate on is needed as far as I know. Result:
See how model_training_task
was scheduled 3 times. You can inspect the log in Mapped Tasks
.