I have a task defined using KubernetesPodOperator and I am passing value of the templated variable run_id as an environment variable.
What baffles me is that when I use the Jinja template syntax, it works. However when I use the Variable.get() syntax, I got a key error from Airflow.
Here is the code:
# this code works!
with DAG(
dag_id="test",
default_args={'owner': 'xxx'},
start_date=days_ago(0),
schedule_interval=None,
) as dag:
task_dry_run = KubernetesPodOperator(
name="test-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
get_logs=True,
env_vars={'run_id': "{{ run_id }}"}
)
# this code does NOT work!
with DAG(
dag_id="test",
default_args={'owner': 'xxx'},
start_date=days_ago(0),
schedule_interval=None,
) as dag:
task_dry_run = KubernetesPodOperator(
name="test-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
in_cluster=False,
get_logs=True,
env_vars={'run_id': Variable.get("run_id")}
)
Presumably the "run_id" Airflow Variable doesn't exist and that's why you are seeing issues. The Variable.get()
call will be executed each time the Scheduler parses the DAG aka parse time (by default every 30 seconds) since it's considered top-level code. Meanwhile, Jinja expressions are not evaluated until runtime. Check out this documentation for more info.
You can access Airflow Variables in Jinja templates as well using {{ var.value.run_id }}
.
Finally, you can also set a default value in a Variable.get()
call too like Variable.get("run_id", default="1234")
.