I'm using Dynamic Task Mapping
feature with Task Groups
to generate n number of tasks for my input list. DAG gets generated but at execution I'm seeing odd behaviour.
Mapped Tasks from second Task are getting triggered simultaneously when Mapped Tasks from first Task are still running. This is reflected in MWAA's logs:
This non-deterministic run causes DAG to fail.
Tried both depends_on_past=True, wait_for_downstream=True
parameters to no luck. Kind thanks for any help...
DAG:
@dag(dag_id='chore_task_group_stage3', catchup=False)
def pipeline():
# t0 = DummyOperator(task_id='start')
# t3 = DummyOperator(task_id='end')
@task_group(group_id="channel_demo_tg")
def tg1(my_num):
@task()
def print_num(num):
return num
@task()
def add_42(num):
return num + 42
print_num(my_num) >> add_42(my_num)
# creating 6 mapped task group instances of the task group group1
tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])
# setting dependencies
tg1_object
pipeline()
Scheduler log:
[[34m2024-08-05T14:11:19.118+0000[0m] {{[34mtask_context_logger.py:[0m91}} ERROR[0m - Executor reports task instance <TaskInstance: chore_task_group_stage3.group1.add_42 manual__2024-08-05T14:10:41+00:00 map_index=4 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?[0m
Adding default_args={ "donot_pickle": "True" }
in @dag
parameters fixed the issue.
But when tasks are not in taskgroup, it doesn't work.