Im Trying to pass a Xcom value as arguments inside a kubernetes pod operator But values are not populating.
Checked Xcom's List in Airflow UI I am able to see the key,value.
This is my syntax:
get_report_task = kubernetes_pod_operator.KubernetesPodOperator(
task_id="get_report_task",
name="%s" % (DAG_ID.replace("_", "-")),
namespace="dsm-kl",
image=KUBER_IMAGE,
image_pull_policy="Always",
is_delete_operator_pod=True,
arguments=["{ti.xcom_pull(dag_id='{DAG_ID}', task_ids='push_task', key='kube_values')}"],
labels={
"vendor": "reversal",
"job": "get_reports",
"dag": DAG_ID,
"task": "get_report_task",
"task_id": "get_report_task",
},
retries=3,
depends_on_past=False,
dag=dag,
resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
affinity=DOC_AFFINITY,
)
Any Syntax Would be Helpful Thanks in advance.
To use jinja template, you need to put the code between {{<code>}}
, and you cannot use jinja in jija as you do to access dag id {DAG_ID}
. When you want to read xcom from a task in the same dag, you don't need to provide the dag id:
get_report_task = kubernetes_pod_operator.KubernetesPodOperator(
task_id="get_report_task",
name="%s" % (DAG_ID.replace("_", "-")),
namespace="dsm-kl",
image=KUBER_IMAGE,
image_pull_policy="Always",
is_delete_operator_pod=True,
arguments=["{{ti.xcom_pull(task_ids='push_task', key='kube_values')}}"],
labels={
"vendor": "reversal",
"job": "get_reports",
"dag": DAG_ID,
"task": "get_report_task",
"task_id": "get_report_task",
},
retries=3,
depends_on_past=False,
dag=dag,
resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
affinity=DOC_AFFINITY,
)