I am creating a dag that should do the following:
The code below is my attempt to do what I want:
import re
from datetime import datetime
from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue
leagues = ["league1", "league2", "league3"]
@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
)
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)
task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)
task_fetch_ids >> task_fetch_detail
The above clearly doesn't work because I am looping through a string. What is the correct syntax ?
you must adapt the xcom return to the args of the dynamic task mapping operator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
dag = DAG(
dag_id="docker_dag",
schedule_interval=None,
start_date=days_ago(1),
)
with dag:
def fn_get_work():
return ["a", "b", "c"]
get_work_task = PythonOperator(task_id="get_work",
python_callable=fn_get_work
)
def fn_build(work):
rst = []
for i in work:
rst.append(f"fetch-event --event-id {i}")
return rst
build_work_task = PythonOperator(task_id="build_work",
python_callable=fn_build,
op_kwargs={"work": get_work_task.output})
run_work_task = DockerOperator.partial(
task_id="run_work",
image="alpine:3.16.2",
).expand(command=build_work_task.output)
get_work_task >> build_work_task >> run_work_task