pythondockerairflowairflow-taskflowairflow-xcom

[Airflow]: Dynamic Task Mapping on DockerOperator using Xcoms


I am creating a dag that should do the following:

The code below is my attempt to do what I want:

import re
from datetime import datetime

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue


leagues = ["league1", "league2", "league3"]




@dag(
    start_date=datetime(2024, 11, 1),
    schedule="@daily",
)
    task_fetch_ids = PythonOperator(
        task_id="fetch_detail",
        ...)


    task_fetch_detail = DockerOperator(
        task_id="fetch_detail",
        image="image:v1",
        ).expand(
            command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
        )


    task_fetch_ids >> task_fetch_detail


The above clearly doesn't work because I am looping through a string. What is the correct syntax ?


Solution

  • you must adapt the xcom return to the args of the dynamic task mapping operator

    
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    from airflow import DAG
    from airflow.providers.docker.operators.docker import DockerOperator
    
    dag = DAG(
        dag_id="docker_dag",
        schedule_interval=None,
        start_date=days_ago(1),
    )
    with dag:
        def fn_get_work():
            return ["a", "b", "c"]
    
    
        get_work_task = PythonOperator(task_id="get_work",
                                       python_callable=fn_get_work
                                       )
    
    
        def fn_build(work):
            rst = []
            for i in work:
                rst.append(f"fetch-event --event-id {i}")
            return rst
    
    
        build_work_task = PythonOperator(task_id="build_work",
                                         python_callable=fn_build,
                                         op_kwargs={"work": get_work_task.output})
    
        run_work_task = DockerOperator.partial(
            task_id="run_work",
            image="alpine:3.16.2",
        ).expand(command=build_work_task.output)
    
        get_work_task >> build_work_task >> run_work_task