airflowkubernetespodoperatorairflow-xcom

Airflow Kubernetes Pod Operator XCom return value is "None" when accessed in pythonOperator


I am running a kubernetes pod using airflow KubernetesPodOperator. Then executing a jar file in the pod and writing the output to the /airflow/xcom/return.json. When checking the task's XCom value it is showing the return_value as the the content of the return.json file in the airflow UI. But when trying to pull the return value inside the pythonOperator callable function, It is returning None. Any idea why this is happening and how to retrieve the return value inside python callable function? Thanks. Code and screenshots below.

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod 
import KubernetesPodOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
from kubernetes.client import models as k8s

def set_input_f(**context):
print("!!!-Pushing input-!!!")
context['ti'].xcom_push(key='input', value='Mr.CocaCola')
print(context['ti'].xcom_pull(key="input"))

def get_output_f(**context):
print("!!!-Getting Output-!!!")
print(context)
pod_logs = context['task_instance'].xcom_pull(task_ids='run_java_pod_xcom')
#output = pod_logs.split('\n')[-2]
print('Output:', pod_logs)
ti=context['ti']
pod_logs1 = ti.xcom_pull(task_ids='run_java_pod_xcom')
print('Output1:', pod_logs1)

with DAG(dag_id="kubernetes_java_xcom",
     start_date=datetime(2021,1,1),
     schedule_interval="@monthly",
     catchup=False) as dag:
set_input = PythonOperator(
    task_id='set_input',
    python_callable=set_input_f
)
get_output = PythonOperator(
    task_id='get_output',
    python_callable=get_output_f,
    provide_context=True
)
k = KubernetesPodOperator(
kubernetes_conn_id="<kubernetes connection>",
namespace="default",
image="<private image location>",
image_pull_secrets=[k8s.V1LocalObjectReference("xxxxx")],
name="run_java_pod_xcom",
cmds=["bash", "-cx"],
env_vars={"INPUT_XCOM": "{{ ti.xcom_pull(task_ids='set_input', key='input') }}"},
arguments=["java -jar /deployments/app.jar"],
labels={"foo": "bar"},
task_id="run_java_code",
do_xcom_push=True
)           



set_input >> k >> get_output

Set input task log

KubernetesPodOperator return_value in ui:

get_output task's log:


Solution

  • the name of the task_ids in the PythonOperator should be "run_java_code" (the task_id of the k) and not "run_java_pod_xcom"

    def get_output_f(**context):
        print("!!!-Getting Output-!!!")
        print(context)
        pod_logs = context['task_instance'].xcom_pull(task_ids='run_java_pod')