I am currently setting up remote workers via docker swarm for Apache Airflow on AWS EC2 instances.
A remote worker shuts down every 60 seconds without an apparent reason with the following error:
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
warnings.warn(msg, category=DeprecationWarning)
Starting flask
* Serving Flask app "airflow.utils.serve_logs" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production WSGI server instead.
* Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=1000 euid=1000 gid=0 egid=0
[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.
worker: Warm shutdown (MainProcess)
-------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport: redis://redis:6379/0
- ** ---------- .> results: postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[tasks]
. airflow.executors.celery_executor.execute_command
All docker services running in my docker stack on the manager node are doing fine, also the selenium service on the remote node. Following the docker compose setup here for Airflow I developed the docker compose file seen below.
Postgres, Redis and Selenium are standard Images.
For the airflow services there are two images:
airflow-manager
which is just the name of the image that is locally created when starting the container.
localhost:5000/myadmin/airflow-remote
is the same image pushed to a local registry such that it can be seen from other machines.
docker-compose.yaml:
version: '3.7'
services:
postgres:
image: postgres:13
env_file:
- ./config/postgres_test.env
ports:
- 5432:5432
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-d", "postgres", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
redis:
image: redis:latest
env_file:
- ./config/postgres_test.env
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
airflow-webserver:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-scheduler:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: scheduler
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-manager:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8794:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-remote:
image: localhost:5000/myadmin/airflow-remote
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8795:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == worker ]
airflow-init:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
- ./config/init.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: version
depends_on:
- postgres
- redis
deploy:
placement:
constraints: [ node.role == manager ]
flower:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
selenium-chrome:
image: selenium/standalone-chrome:latest
ports:
- 4444:4444
deploy:
placement:
constraints: [ node.role == worker ]
depends_on: []
volumes:
postgres-db-volume:
Dockerfile:
FROM apache/airflow:2.0.1-python3.7
COPY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt
Environment files:
airflow_env:
PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__PARALLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true
postgres_test.env:
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow
init.env:
_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
I saw issues about this being resolved by setting the env CELERY_ACKS_LATE=true
, but this didn't help in my case.
This is a really annoying issue, because it spams my flower worker supervision and I would like to extend to more workers running on other nodes.
Do you have any idea what this could be? Any help is appreciated!
Thanks in Advance!
just wanted to let you know that I was able to fix the issue by setting
CELERY_WORKER_MAX_TASKS_PER_CHILD=500
, which otherwise defaults to 50. Our Airflow DAG was sending around 85 tasks to this worker, so it was probably overwhelmed.
Apparently celery doesn't accept more incoming messages from redis and redis shuts down the worker if its outgoing message pipeline is full.
After searching for days with two people, we found the answer. Apparently it is still a workaround, but it works as is should now. I found the answer in this github issue. Just wanted to let you know.
If you have further insights please feel free to share.