dockerdocker-swarmdocker-stackairflow

Unwanted Warm Shutdown (MainProcess) of node worker in airflow docker swarm


I am currently setting up remote workers via docker swarm for Apache Airflow on AWS EC2 instances.

A remote worker shuts down every 60 seconds without an apparent reason with the following error:

BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432

BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432

/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
  warnings.warn(msg, category=DeprecationWarning)
Starting flask
 * Serving Flask app "airflow.utils.serve_logs" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=1000 euid=1000 gid=0 egid=0

[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.

worker: Warm shutdown (MainProcess)

 -------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport:   redis://redis:6379/0
- ** ---------- .> results:     postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . airflow.executors.celery_executor.execute_command

All docker services running in my docker stack on the manager node are doing fine, also the selenium service on the remote node. Following the docker compose setup here for Airflow I developed the docker compose file seen below.

Postgres, Redis and Selenium are standard Images.

For the airflow services there are two images:

  1. airflow-manager which is just the name of the image that is locally created when starting the container.

  2. localhost:5000/myadmin/airflow-remote is the same image pushed to a local registry such that it can be seen from other machines.

docker-compose.yaml:

version: '3.7'

services:
  postgres:
    image: postgres:13
    env_file:
      - ./config/postgres_test.env
    ports:
      - 5432:5432
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-d", "postgres", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  redis:
    image: redis:latest
    env_file:
      - ./config/postgres_test.env
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-webserver:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-scheduler:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: scheduler
    restart: always
    depends_on:
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-worker-manager:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery worker
    restart: always
    ports:
      - 8794:8080
    depends_on:
      - airflow-scheduler
      - airflow-webserver
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-worker-remote:
    image: localhost:5000/myadmin/airflow-remote
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery worker
    restart: always
    ports:
      - 8795:8080
    depends_on:
      - airflow-scheduler
      - airflow-webserver
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == worker ]

  airflow-init:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
      - ./config/init.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: version
    depends_on:
      - postgres
      - redis
    deploy:
      placement:
        constraints: [ node.role == manager ]

  flower:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  selenium-chrome:
    image: selenium/standalone-chrome:latest
    ports:
      - 4444:4444
    deploy:
      placement:
        constraints: [ node.role == worker ]
    depends_on: []


volumes:
  postgres-db-volume:


Dockerfile:


FROM apache/airflow:2.0.1-python3.7
COPY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt

Environment files:

airflow_env:


PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__PARALLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true

postgres_test.env:


POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow

init.env:

_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

I saw issues about this being resolved by setting the env CELERY_ACKS_LATE=true, but this didn't help in my case.

This is a really annoying issue, because it spams my flower worker supervision and I would like to extend to more workers running on other nodes.

Do you have any idea what this could be? Any help is appreciated!

Thanks in Advance!


Solution

  • just wanted to let you know that I was able to fix the issue by setting CELERY_WORKER_MAX_TASKS_PER_CHILD=500, which otherwise defaults to 50. Our Airflow DAG was sending around 85 tasks to this worker, so it was probably overwhelmed.

    Apparently celery doesn't accept more incoming messages from redis and redis shuts down the worker if its outgoing message pipeline is full.

    After searching for days with two people, we found the answer. Apparently it is still a workaround, but it works as is should now. I found the answer in this github issue. Just wanted to let you know.

    If you have further insights please feel free to share.