kubernetesairflowkubernetes-helmkubernetesexecutor

airflow error "dag_id could not be found" when running airflow on KubernetesExecutor


I am using this helm chart to deploy airflow https://github.com/apache/airflow/tree/master/chart

Apache Airflow version: 2.0.0

Kubernetes version: v1.19.4

What happened: I get this error when try to execute tasks using kubernetes

[2021-01-14 19:39:17,628] {dagbag.py:440} INFO - Filling up the DagBag from /opt/airflow/dags/repo/bash.py
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 216, in task_run
    dag = get_dag(args.subdir, args.dag_id)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 189, in get_dag
    'parse.'.format(dag_id)
airflow.exceptions.AirflowException: dag_id could not be found: bash. Either the dag did not exist or it failed to parse.

How to reproduce it: deploy airflow helm chart using this values.yaml:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
---
# Default values for airflow.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

# User and group of airflow user
uid: 50000
gid: 50000

# Airflow home directory
# Used for mount paths
airflowHome: "/opt/airflow"

# Default airflow repository -- overrides all the specific images below
defaultAirflowRepository: apache/airflow

# Default airflow tag to deploy
defaultAirflowTag: 2.0.0


# Select certain nodes for airflow pods.
nodeSelector: { }
affinity: { }
tolerations: [ ]

# Add common labels to all objects and pods defined in this chart.
labels: { }

# Ingress configuration
ingress:
  # Enable ingress resource
  enabled: false

  # Configs for the Ingress of the web Service
  web:
    # Annotations for the web Ingress
    annotations: { }

    # The path for the web Ingress
    path: ""

    # The hostname for the web Ingress
    host: ""

    # configs for web Ingress TLS
    tls:
      # Enable TLS termination for the web Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""

    # HTTP paths to add to the web Ingress before the default path
    precedingPaths: [ ]

    # Http paths to add to the web Ingress after the default path
    succeedingPaths: [ ]

  # Configs for the Ingress of the flower Service
  flower:
    # Annotations for the flower Ingress
    annotations: { }

    # The path for the flower Ingress
    path: ""

    # The hostname for the flower Ingress
    host: ""

    # configs for web Ingress TLS
    tls:
      # Enable TLS termination for the flower Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""

    # HTTP paths to add to the flower Ingress before the default path
    precedingPaths: [ ]

    # Http paths to add to the flower Ingress after the default path
    succeedingPaths: [ ]

# Network policy configuration
networkPolicies:
  # Enabled network policies
  enabled: false


# Extra annotations to apply to all
# Airflow pods
airflowPodAnnotations: { }

# Enable RBAC (default on most clusters these days)
rbacEnabled: true

# Airflow executor
# Options: SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor
executor: "KubernetesExecutor"

# If this is true and using LocalExecutor/SequentialExecutor/KubernetesExecutor, the scheduler's
# service account will have access to communicate with the api-server and launch pods.
# If this is true and using the CeleryExecutor, the workers will be able to launch pods.
allowPodLaunching: true

# Images
images:
  airflow:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  pod_template:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  flower:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  statsd:
    repository: apache/airflow
    tag: airflow-statsd-exporter-2020.09.05-v0.17.0
    pullPolicy: IfNotPresent
  redis:
    repository: redis
    tag: 6-buster
    pullPolicy: IfNotPresent
  pgbouncer:
    repository: apache/airflow
    tag: airflow-pgbouncer-2020.09.05-1.14.0
    pullPolicy: IfNotPresent
  pgbouncerExporter:
    repository: apache/airflow
    tag: airflow-pgbouncer-exporter-2020.09.25-0.5.0
    pullPolicy: IfNotPresent
  gitSync:
    repository: k8s.gcr.io/git-sync
    tag: v3.1.6
    pullPolicy: IfNotPresent

# Environment variables for all airflow containers
env:
  - name: "AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER"
    value: "65533"

# Secrets for all airflow containers
secret: [ ]
# - envName: ""
#   secretName: ""
#   secretKey: ""

# Extra secrets that will be managed by the chart
# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values).
# The format is "key/value" where
#    * key (can be templated) is the the name the secret that will be created
#    * value: an object with the standard 'data' or 'stringData' key (or both).
#          The value associated with those keys must be a string (can be templated)
extraSecrets: { }
# eg:
# extraSecrets:
#   {{ .Release.Name }}-airflow-connections:
#     data: |
#       AIRFLOW_CONN_GCP: 'base64_encoded_gcp_conn_string'
#       AIRFLOW_CONN_AWS: 'base64_encoded_aws_conn_string'
#     stringData: |
#       AIRFLOW_CONN_OTHER: 'other_conn'
#   {{ .Release.Name }}-other-secret-name-suffix: |
#     data: |
#        ...

# Extra ConfigMaps that will be managed by the chart
# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values).
# The format is "key/value" where
#    * key (can be templated) is the the name the configmap that will be created
#    * value: an object with the standard 'data' key.
#          The value associated with this keys must be a string (can be templated)
extraConfigMaps: { }
# eg:
# extraConfigMaps:
#   {{ .Release.Name }}-airflow-variables:
#     data: |
#       AIRFLOW_VAR_HELLO_MESSAGE: "Hi!"
#       AIRFLOW_VAR_KUBERNETES_NAMESPACE: "{{ .Release.Namespace }}"

# Extra env 'items' that will be added to the definition of airflow containers
# a string is expected (can be templated).
extraEnv: ~
# eg:
# extraEnv: |
#   - name: PLATFORM
#     value: FR

# Extra envFrom 'items' that will be added to the definition of airflow containers
# A string is expected (can be templated).
extraEnvFrom: ~
# eg:
# extraEnvFrom: |
#   - secretRef:
#       name: '{{ .Release.Name }}-airflow-connections'
#   - configMapRef:
#       name: '{{ .Release.Name }}-airflow-variables'

# Airflow database config
data:
  # If secret names are provided, use those secrets
  metadataSecretName: ~
  resultBackendSecretName: ~

  # Otherwise pass connection values in
  metadataConnection:
    user: postgres
    pass: postgres
    host: ~
    port: 5432
    db: postgres
    sslmode: disable
  resultBackendConnection:
    user: postgres
    pass: postgres
    host: ~
    port: 5432
    db: postgres
    sslmode: disable

# Fernet key settings
fernetKey: ~
fernetKeySecretName: ~


# In order to use kerberos you need to create secret containing the keytab file
# The secret name should follow naming convention of the application where resources are
# name {{ .Release-name }}-<POSTFIX>. In case of the keytab file, the postfix is "kerberos-keytab"
# So if your release is named "my-release" the name of the secret should be "my-release-kerberos-keytab"
#
# The Keytab content should be available in the "kerberos.keytab" key of the secret.
#
#  apiVersion: v1
#  kind: Secret
#  data:
#    kerberos.keytab: <base64_encoded keytab file content>
#  type: Opaque
#
#
#  If you have such keytab file you can do it with similar
#
#  kubectl create secret generic {{ .Release.name }}-kerberos-keytab --from-file=kerberos.keytab
#
kerberos:
  enabled: false
  ccacheMountPath: '/var/kerberos-ccache'
  ccacheFileName: 'cache'
  configPath: '/etc/krb5.conf'
  keytabPath: '/etc/airflow.keytab'
  principal: 'airflow@FOO.COM'
  reinitFrequency: 3600
  config: |
    # This is an example config showing how you can use templating and how "example" config
    # might look like. It works with the test kerberos server that we are using during integration
    # testing at Apache Airflow (see `scripts/ci/docker-compose/integration-kerberos.yml` but in
    # order to make it production-ready you must replace it with your own configuration that
    # Matches your kerberos deployment. Administrators of your Kerberos instance should
    # provide the right configuration.

    [logging]
    default = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_libs.log"
    kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
    admin_server = "FILE:{{ template "airflow_logs_no_quote" . }}/kadmind.log"

    [libdefaults]
    default_realm = FOO.COM
    ticket_lifetime = 10h
    renew_lifetime = 7d
    forwardable = true

    [realms]
    FOO.COM = {
      kdc = kdc-server.foo.com
      admin_server = admin_server.foo.com
    }

# Airflow Worker Config
workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1

  # Allow KEDA autoscaling.
  # Persistence.enabled must be set to false to use KEDA.
  keda:
    enabled: false
    namespaceLabels: { }

    # How often KEDA polls the airflow DB to report new scale requests to the HPA
    pollingInterval: 5

    # How many seconds KEDA will wait before scaling to zero.
    # Note that HPA has a separate cooldown period for scale-downs
    cooldownPeriod: 30

    # Maximum number of workers created by keda
    maxReplicaCount: 10

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 100Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Execute init container to chown log directory.
    # This is currently only needed in KinD, due to usage
    # of local-path provisioner.
    fixPermissions: false

  kerberosSidecar:
    # Enable kerberos sidecar
    enabled: false

  resources: { }
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for tasks to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 600

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true
  # Annotations to add to worker kubernetes service account.
  serviceAccountAnnotations: { }
  # Mount additional volumes into worker.
  extraVolumes: [ ]
  extraVolumeMounts: [ ]

# Airflow scheduler settings
scheduler:
  # Airflow 2.0 allows users to run multiple schedulers,
  # However this feature is only recommended for MySQL 8+ and Postgres
  replicas: 1
  # Scheduler pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  resources: { }
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # This setting can overwrite
  # podMutation settings.
  airflowLocalSettings: ~

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Annotations to add to scheduler kubernetes service account.
  serviceAccountAnnotations: { }

  # Mount additional volumes into scheduler.
  extraVolumes: [ ]
  extraVolumeMounts: [ ]

# Airflow webserver settings
webserver:
  allowPodLogReading: true
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5

  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5

  # Number of webservers
  replicas: 1

  # Additional network policies as needed
  extraNetworkPolicies: [ ]

  resources: { }
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  # Create initial user.
  defaultUser:
    enabled: true
    role: Admin
    username: admin
    email: admin@example.com
    firstName: admin
    lastName: user
    password: admin

  # Mount additional volumes into webserver.
  extraVolumes: [ ]
  #    - name: airflow-ui
  #      emptyDir: { }
  extraVolumeMounts: [ ]
  #    - name: airflow-ui
  #      mountPath: /opt/airflow

  # This will be mounted into the Airflow Webserver as a custom
  # webserver_config.py. You can bake a webserver_config.py in to your image
  # instead
  webserverConfig: ~
  # webserverConfig: |
  #   from airflow import configuration as conf

  #   # The SQLAlchemy connection string.
  #   SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')

  #   # Flask-WTF flag for CSRF
  #   CSRF_ENABLED = True

  service:
    type: NodePort
    ## service annotations
    annotations: { }

  # Annotations to add to webserver kubernetes service account.
  serviceAccountAnnotations: { }

# Flower settings
flower:
  # Additional network policies as needed
  extraNetworkPolicies: [ ]
  resources: { }
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  # A secret containing the connection
  secretName: ~

  # Else, if username and password are set, create secret from username and password
  username: ~
  password: ~

  service:
    type: ClusterIP

# Statsd settings
statsd:
  enabled: true
  # Additional network policies as needed
  extraNetworkPolicies: [ ]
  resources: { }
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  service:
    extraAnnotations: { }

# Pgbouncer settings
pgbouncer:
  # Enable pgbouncer
  enabled: false
  # Additional network policies as needed
  extraNetworkPolicies: [ ]

  # Pool sizes
  metadataPoolSize: 10
  resultBackendPoolSize: 5

  # Maximum clients that can connect to pgbouncer (higher = more file descriptors)
  maxClientConn: 100

  # Pgbouner pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  # Limit the resources to pgbouncerExported.
  # When you specify the resource request the scheduler uses this information to decide which node to place
  # the Pod on. When you specify a resource limit for a Container, the kubelet enforces those limits so
  # that the running container is not allowed to use more of that resource than the limit you set.
  # See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
  # Example:
  #
  # resource:
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi
  resources: { }

  service:
    extraAnnotations: { }

  # https://www.pgbouncer.org/config.html
  verbose: 0
  logDisconnections: 0
  logConnections: 0

  sslmode: "prefer"
  ciphers: "normal"

  ssl:
    ca: ~
    cert: ~
    key: ~

redis:
  terminationGracePeriodSeconds: 600

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 1Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:

  resources: { }
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # If set use as redis secret
  passwordSecretName: ~
  brokerURLSecretName: ~

  # Else, if password is set, create secret with it,
  # else generate a new one on install
  password: ~

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

# Auth secret for a private registry
# This is used if pulling airflow images from a private registry
registry:
  secretName: ~

  # Example:
  # connection:
  #   user: ~
  #   pass: ~
  #   host: ~
  #   email: ~
  connection: { }

# Elasticsearch logging configuration
elasticsearch:
  # Enable elasticsearch task logging
  enabled: true
  # A secret containing the connection
  #  secretName: ~
  # Or an object representing the connection
  # Example:
  connection:
    #     user:
    #     pass:
    host: elasticsearch-master-headless.elk.svc.cluster.local
    port: 9200
#  connection: {}


# All ports used by chart
ports:
  flowerUI: 5555
  airflowUI: 8080
  workerLogs: 8793
  redisDB: 6379
  statsdIngest: 9125
  statsdScrape: 9102
  pgbouncer: 6543
  pgbouncerScrape: 9127

# Define any ResourceQuotas for namespace
quotas: { }

# Define default/max/min values for pods and containers in namespace
limits: [ ]

# This runs as a CronJob to cleanup old pods.
cleanup:
  enabled: false
  # Run every 15 minutes
  schedule: "*/15 * * * *"

# Configuration for postgresql subchart
# Not recommended for production
postgresql:
  enabled: true
  postgresqlPassword: postgres
  postgresqlUsername: postgres

# Config settings to go into the mounted airflow.cfg
#
# Please note that these values are passed through the `tpl` function, so are
# all subject to being rendered as go templates. If you need to include a
# literal `{{` in a value, it must be expressed like this:
#
#    a: '{{ "{{ not a template }}" }}'
#
# yamllint disable rule:line-length
config:
  core:
    dags_folder: '{{ include "airflow_dags" . }}'
    load_examples: 'False'
    executor: '{{ .Values.executor }}'
    # For Airflow 1.10, backward compatibility
    colored_console_log: 'True'
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
  # Authentication backend used for the experimental API
  api:
    auth_backend: airflow.api.auth.backend.deny_all
  logging:
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
    colored_console_log: 'True'
    logging_level: INFO
  metrics:
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
  webserver:
    enable_proxy_fix: 'True'
    expose_config: 'True'
    rbac: 'True'
  celery:
    default_queue: celery
  scheduler:
    scheduler_heartbeat_sec: 5
    # For Airflow 1.10, backward compatibility
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
    # Restart Scheduler every 41460 seconds (11 hours 31 minutes)
    # The odd time is chosen so it is not always restarting on the same "hour" boundary
    run_duration: 41460
  elasticsearch:
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
  elasticsearch_configs:
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'
  kerberos:
    keytab: '{{ .Values.kerberos.keytabPath }}'
    reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
    principal: '{{ .Values.kerberos.principal }}'
    ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
  kubernetes:
    namespace: '{{ .Release.Namespace }}'
    airflow_configmap: '{{ include "airflow_config" . }}'
    airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
    pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
    worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
    worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
    delete_worker_pods: 'False'
    multi_namespace_mode: '{{ if .Values.multiNamespaceMode }}True{{ else }}False{{ end }}'
# yamllint enable rule:line-length

multiNamespaceMode: false

podTemplate:

# Git sync
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: false
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName: gp2
    # access mode of the persistent volume
    accessMode: ReadWriteMany
    ## the name of an existing PVC to use
    existingClaim: "airflow-dags"
  gitSync:
    enabled: true
    repo: git@github.com:Tikna-inc/airflow.git
    branch: main
    rev: HEAD
    root: "/git"
    dest: "repo"
    depth: 1
    maxFailures: 0
    subPath: ""
    sshKeySecret: airflow-ssh-secret
    wait: 60
    containerName: git-sync
    uid: 65533

and this is the dag with its tasks



from datetime import timedelta

import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

logging.getLogger().setLevel(level=logging.INFO)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


def get_active_customers():
    requests.get("localhost:8080")


dag = DAG(
    'bash',
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval='*/2 * * * *',
    start_date=days_ago(1),
    tags=['Test'],
    is_paused_upon_creation=False,
    catchup=False
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='mkdir ./itsMe',
    dag=dag
)

t1

This is airflow.cfg file

[api]
auth_backend = airflow.api.auth.backend.deny_all

[celery]
default_queue = celery

[core]
colored_console_log = True
dags_folder = /opt/airflow/dags/repo/
executor = KubernetesExecutor
load_examples = False
remote_logging = False

[elasticsearch]
json_format = True
log_id_template = {dag_id}_{task_id}_{execution_date}_{try_number}

[elasticsearch_configs]
max_retries = 3
retry_timeout = True
timeout = 30

[kerberos]
ccache = /var/kerberos-ccache/cache
keytab = /etc/airflow.keytab
principal = airflow@FOO.COM
reinit_frequency = 3600

[kubernetes]
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
dags_in_image = False
delete_worker_pods = False
multi_namespace_mode = False
namespace = airflow
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = apache/airflow
worker_container_tag = 2.0.0

[logging]
colored_console_log = True
logging_level = INFO
remote_logging = False

[metrics]
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

[scheduler]
run_duration = 41460
scheduler_heartbeat_sec = 5
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

[webserver]
enable_proxy_fix = True
expose_config = True

This is the pod yaml file for the new tasks

apiVersion: v1
kind: Pod
metadata:
  annotations:
    dag_id: bash2
    execution_date: "2021-01-14T20:16:00+00:00"
    kubernetes.io/psp: eks.privileged
    task_id: create_dir
    try_number: "2"
  labels:
    airflow-worker: "38"
    airflow_version: 2.0.0
    dag_id: bash2
    execution_date: 2021-01-14T20_16_00_plus_00_00
    kubernetes_executor: "True"
    task_id: create_dir
    try_number: "2"
  name: sss3
  namespace: airflow
spec:
  containers:
    - args:
        - airflow
        - tasks
        - run
        - bash2
        - create_dir
        - "2021-01-14T20:16:00+00:00"
        - --local
        - --pool
        - default_pool
        - --subdir
        - /opt/airflow/dags/repo/bash.py
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              key: fernet-key
              name: airflow-fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              key: connection
              name: airflow-airflow-metadata
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              key: connection
              name: airflow-airflow-metadata
        - name: AIRFLOW_IS_K8S_EXECUTOR_POD
          value: "True"
      image: apache/airflow:2.0.0
      imagePullPolicy: IfNotPresent
      name: base
      resources: { }
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
      volumeMounts:
        - mountPath: /opt/airflow/logs
          name: airflow-logs
        - mountPath: /opt/airflow/airflow.cfg
          name: config
          readOnly: true
          subPath: airflow.cfg
        - mountPath: /etc/git-secret/ssh
          name: git-sync-ssh-key
          subPath: ssh
        - mountPath: /opt/airflow/dags
          name: dags
          readOnly: true
        - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
          name: airflow-worker-token-7sdtr
          readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  initContainers:
    - env:
        - name: GIT_SSH_KEY_FILE
          value: /etc/git-secret/ssh
        - name: GIT_SYNC_SSH
          value: "true"
        - name: GIT_KNOWN_HOSTS
          value: "false"
        - name: GIT_SYNC_REV
          value: HEAD
        - name: GIT_SYNC_BRANCH
          value: main
        - name: GIT_SYNC_REPO
          value: git@github.com:Tikna-inc/airflow.git
        - name: GIT_SYNC_DEPTH
          value: "1"
        - name: GIT_SYNC_ROOT
          value: /git
        - name: GIT_SYNC_DEST
          value: repo
        - name: GIT_SYNC_ADD_USER
          value: "true"
        - name: GIT_SYNC_WAIT
          value: "60"
        - name: GIT_SYNC_MAX_SYNC_FAILURES
          value: "0"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
      image: k8s.gcr.io/git-sync:v3.1.6
      imagePullPolicy: IfNotPresent
      name: git-sync
      resources: { }
      securityContext:
        runAsUser: 65533
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
      volumeMounts:
        - mountPath: /git
          name: dags
        - mountPath: /etc/git-secret/ssh
          name: git-sync-ssh-key
          readOnly: true
          subPath: gitSshKey
        - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
          name: airflow-worker-token-7sdtr
          readOnly: true
  nodeName: ip-172-31-41-37.eu-south-1.compute.internal
  priority: 0
  restartPolicy: Never
  schedulerName: default-scheduler
  securityContext:
    runAsUser: 50000
  serviceAccount: airflow-worker
  serviceAccountName: airflow-worker
  terminationGracePeriodSeconds: 30
  tolerations:
    - effect: NoExecute
      key: node.kubernetes.io/not-ready
      operator: Exists
      tolerationSeconds: 300
    - effect: NoExecute
      key: node.kubernetes.io/unreachable
      operator: Exists
      tolerationSeconds: 300
  volumes:
    - emptyDir: { }
      name: dags
    - name: git-sync-ssh-key
      secret:
        defaultMode: 288
        secretName: airflow-ssh-secret
    - emptyDir: { }
      name: airflow-logs
    - configMap:
        defaultMode: 420
        name: airflow-airflow-config
      name: config
    - name: airflow-worker-token-7sdtr
      secret:
        defaultMode: 420
        secretName: airflow-worker-token-7sdtr

-----------------------Important----------------------------

Debugging

for debugging purpose I have changed the pod args rather than running the task, I ran it with

spec:
  containers:
    - args:
        - airflow
        - webserver

and tried to look for the Dags , and found None. It seems like gitSync is not working with the pods triggered by kubernetesExecutor.

Any help please ???


Solution

  • I have posted an issue in github and now it's solved , this is the link

    solution in github