I am trying to implement a similar solution from this question: dynamic dag creation based on dependencies from table
However, I am running in a couple issues that I cannot seem to solve. Here is my code:
@dag(start_date=datetime(2024, 2, 3), schedule_interval='@daily', catchup=True, default_args=default_args,
max_active_runs=1)
def test_job():
work_flow_task_group = DatabricksWorkflowTaskGroup(
...
)
with work_flow_task_group:
for task_id, details in task_list.items():
templates.create_misc_notebook(task_id,
details['notebookpath'])
for task_id, details in task_list.items():
if task_up := details.get("input_table"):
work_flow_task_group.dag.get_task("task2")
test_job()
This part works just fine and generates a dag with 5 tasks with task_ids: task1, task2, task3, task4, task5
for task_id, details in task_list.items():
templates.create_misc_notebook(task_id,
details['notebookpath'])
However, when I introduce this bit of code:
for task_id, details in task_list.items():
if task_up := details.get("input_table"):
work_flow_task_group.dag.get_task("task2")
And try to find task2, I get the following error:
Broken DAG: [/usr/local/airflow/dags/date.py] Traceback (most recent call last):
File "/usr/local/airflow/dags/date.py", line 45, in test_job
work_flow_task_group.dag.get_task("task2")
File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 2591, in get_task
raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task task2 not found
I am not entirely sure why this is happening. I am setting the task_ids explicitly for my tests so I am a bit unsure as to why it is unable to find the task.
Not entirely sure on the Databricks part of the code, but in Airflow, when tasks are a part of a TaskGroup, their ID is ..
You can also set a parameter in the TaskGroup
to disable this feature. Look in this link for prefix_group_id=False