I have an Airflow DAG that uses a KubernetesPodOperator to run a containerized task. I would like to be able to parameterize the resources (memory and CPU) allocated to the container, so that I can change them depending on the specific DAG run.
I tried using Airflow's params to define the container_resources argument in the KubernetesPodOperator, but I couldn't find a way to reference the parameters in the dictionary. Here's a simplified version of my code:
from airflow import DAG
from airflow.contrib.operators import KubernetesPodOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 7),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(
'my_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
params={
"request_memory": Param("2000", type="string"),
"request_cpu": Param("4000", type="string"),
"limit_memory": Param("4G", type="string"),
"limit_cpu": Param("5000", type="string"),
}
) as dag:
task = KubernetesPodOperator(
task_id='my_task',
name='my_task',
image='my_image:latest',
namespace='my_namespace',
image_pull_policy='Always',
is_delete_operator_pod=True,
container_resources={
'request_memory': '{{ params.request_memory }}',# tried dag.params["request_memory"],
'limit_memory': '{{ params.limit_memory }}'
'request_cpu': '{{ params.request_cpu }}',
'limit_cpu': '{{ params.limit_cpu }}',
},
)
However, the Jinja template is not replaced.
Is there a way to reference the DAG's params inside the container_resources dictionary in the KubernetesPodOperator? If not, what would be the best way to parameterize the resources allocated to the container?
I was using the apache-airflow-providers-cncf-kubernetes==4.2.2 the container_resources was not yet templated
I updated to apache-airflow-providers-cncf-kubernetes==5.2.2 and it worked