I'm running airflow on kubernetes and I'm trying to implement a dag s3 sync instead of git-sync in my airflow.
My airflow deployment is using the helm chart apache-airflow https://airflow.apache.org/ --version 1.5.0.
My values.yaml is set up like bellow:
airflow:
env:
- name: AIRFLOW__CORE__ENABLE_XCOM_PICKLING
value: "True"
config:
core:
dags_folder: /opt/airflow/dags/repo/
executor: KubernetesExecutor
dags:
persistence:
enabled: false
executor: KubernetesExecutor
scheduler:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: amazon/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*"
sleep 600;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "key"
- name: AWS_SECRET_ACCESS_KEY
value: "secret"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
triggerer:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: amazon/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://bucket/airflow/ /opt/airflow/dags/repo/ --exclude "*" --include "dags/*"
sleep 600;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "key"
- name: AWS_SECRET_ACCESS_KEY
value: "secret"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/repo
I'm doing a sync from the s3 bucket to a folder inside the POD.
When I access the pod the dag file is inside the folder as expected: /opt/airflow/dags/repo/dags/dag_manual.py
But when I try to run any dag, I'm getting the following error:
[2023-03-24 23:01:40,968] {dagbag.py:507} INFO - Filling up the DagBag from /opt/airflow/dags/repo/dags/dag_manual.py
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
args.func(args)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
return f(*args, **kwargs)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/task_command.py", line 360, in task_run
dag = get_dag(args.subdir, args.dag_id)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 204, in get_dag
f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
airflow.exceptions.AirflowException: Dag 'DAG_MANUAL' could not be found; either it does not exist or it failed to parse.
I haven't figured it out the problem, if someone could help.
I may be a bit late but since no one answered I'll try to answer. I tried to setup the similar structure of S3 sync that you were trying to achieve. I used the snippets from your values file and this is the final values file which is working with Airflow helm chart.
defaultAirflowTag: "2.5.1"
airflowVersion: "2.5.1"
fernetKey: "keys"
webserverSecretKey: "secretkey"
config:
core:
dags_folder: /opt/airflow/dags/
env:
- name: "AIRFLOW__CORE__LOAD_EXAMPLES"
value: "False"
- name: "AIRFLOW__KUBERNETES_EXECUTOR__NAMESPACE"
value: "airflow"
- name: "AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_REPOSITORY"
value: "account_id.dkr.ecr.eu-west-2.amazonaws.com/airflow-dags"
- name: "AIRFLOW__KUBERNETES_EXECUTOR__WORKER_CONTAINER_TAG"
value: "15"
- name: "AIRFLOW__KUBERNETES__DAGS_IN_IMAGE"
value: "True"
- name: "AIRFLOW__KUBERNETES__RUN_AS_USER"
value: "50000"
executor: "KubernetesExecutor"
images:
airflow:
repository: account_id.dkr.ecr.eu-west-2.amazonaws.com/airflow-dags
tag: "15"
createUserJob:
useHelmHooks: false
migrateDatabaseJob:
useHelmHooks: false
webserver:
replicas: 3
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: public.ecr.aws/aws-cli/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
sleep 60;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "random_key"
- name: AWS_SECRET_ACCESS_KEY
value: "random_key"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
scheduler:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: public.ecr.aws/aws-cli/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
sleep 60;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "random_key"
- name: AWS_SECRET_ACCESS_KEY
value: "random_key"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
triggerer:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
extraVolumes:
- name: dags-volume
emptyDir: {}
extraContainers:
- name: s3-sync
image: public.ecr.aws/aws-cli/aws-cli
command:
- /bin/sh
- -c
- >
while true; do
aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
sleep 60;
done
env:
- name: AWS_ACCESS_KEY_ID
value: "random_key"
- name: AWS_SECRET_ACCESS_KEY
value: "random_key"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
workers:
extraVolumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
extraVolumes:
- name: dags-volume
emptyDir: {}
extraInitContainers:
- name: s3-sync
image: public.ecr.aws/aws-cli/aws-cli
command:
- /bin/sh
- -c
- aws s3 sync s3://airflow-dags/ /opt/airflow/dags/
env:
- name: AWS_ACCESS_KEY_ID
value: "random_key"
- name: AWS_SECRET_ACCESS_KEY
value: "random_key"
- name: AWS_DEFAULT_REGION
value: "region"
volumeMounts:
- name: dags-volume
mountPath: /opt/airflow/dags/
rbac:
create: true
events: true
secrets: true
ingress:
web:
enabled: true
annotations:
alb.ingress.kubernetes.io/certificate-arn: arn:aws:acm:eu-west-2:account_id:certificate/wdbhwdekaqs
alb.ingress.kubernetes.io/group.name: airflow
alb.ingress.kubernetes.io/listen-ports: '[{"HTTPS":443}, {"HTTP":80}]'
alb.ingress.kubernetes.io/load-balancer-attributes: idle_timeout.timeout_seconds=4000
alb.ingress.kubernetes.io/scheme: internal
alb.ingress.kubernetes.io/ssl-redirect: '443'
alb.ingress.kubernetes.io/target-type: ip
kubernetes.io/ingress.class: alb
path: "/"
pathType: "Prefix"
hosts:
- name: "airflow.dev.example.net"
tls:
# # Enable TLS termination for the web Ingress
enabled: true
This values file is working with the following helm chart.
repoURL: "https://airflow.apache.org"
targetRevision: "1.15.0"
chart: airflow
The reason because of which you might have faced the issue might be because you weren't syncing the DAGs from the S3 bucket to the workers. Which I did using the init containers. You can use a side car as well if needed but you would be responsible to terminate it when the main container is done executing.
One improvement that can be made here is to make use of IAM roles instead of keys.
Note: I'm using a custom airflow image but that is created from the official airflow image and not much changes have been done to it except for installing a couple of python packages. The values file works with an official image as well.
Sample DAG that I ran:
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from datetime import datetime
@dag(start_date=datetime(2023, 1 , 1), schedule='@daily', catchup=False)
def parallel_dag():
tasks = [BashOperator(task_id='task_{0}'.format(t), bash_command='sleep 60'.format(t)) for t in range(1, 4)]
@task
def task_4(data):
print(data)
return 'done'
@task
def task_5(data):
print(data)
tasks >> task_5(task_4(42))
parallel_dag()
Please do let me know if you face any issues with it.