pythonkubernetesairflowkubernetespodoperator

How to Get Xcom Values inside Kubernetes Pod Operator


Im Trying to pass a Xcom value as arguments inside a kubernetes pod operator But values are not populating.

Checked Xcom's List in Airflow UI I am able to see the key,value.

This is my syntax:

get_report_task = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="get_report_task",
    name="%s" % (DAG_ID.replace("_", "-")),
    namespace="dsm-kl",
    image=KUBER_IMAGE,
    image_pull_policy="Always",
    is_delete_operator_pod=True,
    arguments=["{ti.xcom_pull(dag_id='{DAG_ID}', task_ids='push_task', key='kube_values')}"],
    labels={
        "vendor": "reversal",
        "job": "get_reports",
        "dag": DAG_ID,
        "task": "get_report_task",
        "task_id": "get_report_task",
    },
    retries=3,
    depends_on_past=False,
    dag=dag,
    resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
    affinity=DOC_AFFINITY,
)

Any Syntax Would be Helpful Thanks in advance.


Solution

  • To use jinja template, you need to put the code between {{<code>}}, and you cannot use jinja in jija as you do to access dag id {DAG_ID}. When you want to read xcom from a task in the same dag, you don't need to provide the dag id:

    get_report_task = kubernetes_pod_operator.KubernetesPodOperator(
        task_id="get_report_task",
        name="%s" % (DAG_ID.replace("_", "-")),
        namespace="dsm-kl",
        image=KUBER_IMAGE,
        image_pull_policy="Always",
        is_delete_operator_pod=True,
        arguments=["{{ti.xcom_pull(task_ids='push_task', key='kube_values')}}"],
        labels={
            "vendor": "reversal",
            "job": "get_reports",
            "dag": DAG_ID,
            "task": "get_report_task",
            "task_id": "get_report_task",
        },
        retries=3,
        depends_on_past=False,
        dag=dag,
        resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
        affinity=DOC_AFFINITY,
    )