pythonairflowmwaa

Ariflow Tasks in TaskGroup run out of order


I'm using Dynamic Task Mapping feature with Task Groups to generate n number of tasks for my input list. DAG gets generated but at execution I'm seeing odd behaviour. enter image description here

Mapped Tasks from second Task are getting triggered simultaneously when Mapped Tasks from first Task are still running. This is reflected in MWAA's logs: enter image description here

This non-deterministic run causes DAG to fail.

Tried both depends_on_past=True, wait_for_downstream=True parameters to no luck. Kind thanks for any help...

DAG:

@dag(dag_id='chore_task_group_stage3', catchup=False)
def pipeline():

    # t0 = DummyOperator(task_id='start')
    # t3 = DummyOperator(task_id='end')

    @task_group(group_id="channel_demo_tg")
    def tg1(my_num):
        
        @task()
        def print_num(num):
            return num

        @task()
        def add_42(num):
            return num + 42

        print_num(my_num) >> add_42(my_num)

    # creating 6 mapped task group instances of the task group group1
    tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])

    # setting dependencies
    tg1_object

pipeline()

Scheduler log:

[[34m2024-08-05T14:11:19.118+0000[0m] {{[34mtask_context_logger.py:[0m91}} ERROR[0m - Executor reports task instance <TaskInstance: chore_task_group_stage3.group1.add_42 manual__2024-08-05T14:10:41+00:00 map_index=4 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?[0m

Solution

  • Adding default_args={ "donot_pickle": "True" } in @dag parameters fixed the issue.

    But when tasks are not in taskgroup, it doesn't work.