kubernetesairflowkubernetespodoperator

Bypass resources provided by pod mutation hook


We have airflow running on kubernetes. Below is my airflowlocalsettings:

airflowLocalSettings: |
      from airflow.contrib.kubernetes.pod import Pod, Resources
      from airflow.configuration import conf
      def pod_mutation_hook(pod: Pod):
          pod.labels.update({"app": "airflow-pod"})
          pod.annotations.update({"iam.amazonaws.com/role": "role"})
          pod.tolerations += [{"key": "spotInstance", "operator": "Exists"}]
          pod.resources = Resources(limit_memory = "512Mi", limit_cpu = "300m")
          pod.affinity.update({
            "nodeAffinity": {
              "preferredDuringSchedulingIgnoredDuringExecution": [
                {"weight": 100, "preference": {
                  "matchExpressions": [{
                    "key": "role.node.kubernetes.io/spot-worker",
                    "operator": "In",
                    "values": ["spot-worker"]
                  }]
                }}
              ]
            }
          })

I want to launch a few selective task pods which have own resources passed through resources parameter in KubernetesPodOperator. Now the issue is that Resources are being overwritten and it gets default resources from pod_mutation_hook.

How can we bypass pod_mutation resources so that these pods can have their own resource settings? I cannot remove resource settings from pod_mutation_hook because it is being used by other pods as well.


Solution

  • I solved this problem by tweaking pod_mutaion_hook code like below.

    resources = pod.resources
    is_class = isinstance(resources, Resources)
    if is_class:
        resource_flag = resources.is_empty_resource_request()
    else:
        resource_flag = False
    if resource_flag:
        pod.resources = Resources(limit_memory="512Mi", limit_cpu="300m")
    

    Basically i am checking if resources are already available in pod and then taking action.