pythonloggingceleryfastapi

Unable to configure Celery logging using a YAML configuration file with FastAPI server


I am using Celery as the background task processor with a FastAPI web server. Logging for FastAPI is configured using a logging.yaml file via Uvicorn like this:

uvicorn syncapp.main:app --workers 4 --host 0.0.0.0 --port 8084 --log-config logging.yaml

I have configured multiple loggers which work fine for the API server. However, no matter what I do, I cannot get Celery to use a logger defined in the logging.yaml file. I have tried other solutions, but none seem to work for my case.

Celery main application: (Path: syncapp/worker.py)

from celery import Celery

from syncapp.settings import get_settings

settings = get_settings()

app = Celery(
    main=settings.app_name,
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
    include="syncapp.tasks.copy_data",
)

app.conf.update(
    broker_connection_retry_on_startup=True,
    worker_hijack_root_logger=False,
)

Example task method (Path: syncapp/tasks/copy_data.py)

from celery.utils.log import get_task_logger

from syncapp.commons.enums import WorkerQueue
from syncapp.worker import app

logger = get_task_logger(__name__)


@app.task(queue=WorkerQueue.COPY_DATA.value)
def copy_index_data(index_name: str) -> bool:
    """Copies index data up until the current date."""
    task_id  = celery.current_task.request.id
    logger.info("Job %s: Copying data from source index %s", task_id, index_name)
    return True

Contents of logging.yaml file (some keys are removed to keep it short):

version: 1

...

handlers:
  hl1_console:
    class: logging.StreamHandler
    formatter: simple
    stream: ext://sys.stdout

  hl1_syncapp_file_handler:
    class: logging.FileHandler
    filename: logs/syncapp.log
    formatter: extended

  hl1_synctask_file_handler:
    class: logging.FileHandler
    filename: logs/synctask.log
    formatter: extended

  hl1_celery_file_handler:
    class: logging.FileHandler
    filename: logs/celery.log
    formatter: extended

  hl2_synctask_queue_handler:
    class: logging_.handlers.QueueListenerHandler
    handlers:
      - cfg://handlers.hl1_console
      - cfg://handlers.hl1_synctask_file_handler
    queue: cfg://objects.synctask_queue

  hl2_celery_queue_handler:
    class: logging_.handlers.QueueListenerHandler
    handlers:
      - cfg://handlers.hl1_console
      - cfg://handlers.hl1_celery_file_handler
    queue: cfg://objects.celery_queue

  hl2_syncapp_queue_handler:
    class: logging_.handlers.QueueListenerHandler
    handlers:
      - cfg://handlers.hl1_console
      - cfg://handlers.hl1_syncapp_file_handler
    queue: cfg://objects.syncapp_queue

loggers:
  celery.task:
    level: DEBUG
    handlers:
      - hl2_synctask_queue_handler
    propagate: no
  celery:
    level: INFO
    handlers:
      - hl2_celery_queue_handler
    propagate: no
  syncapp:
    level: DEBUG
    handlers:
      - hl2_syncapp_queue_handler
    propagate: no

root:
  level: INFO
  handlers:
    - hl1_console

Running Celery worker with the following command (did not mention the logfile parameter here):

celery -A syncapp.worker:app worker --concurrency=4 --hostname=copy@%h --queues=syncapp.tasks.copy_data --loglevel=INFO

I want the Celery backend to log its events using the celery logger and the tasks logs to syncapp.tasks logger. The main application logs should go through syncapp logger (which it does).

Update #4

Instead of preventing Celery from configuring logger by connecting to signals.setup_logging, I can manually reinitialize logger by connecting to signals.after_setup_logger and signals.after_setup_task_logger. The code looks like this (yes, repeating code could be tidied up)

app = Celery(
    main=settings.app_name,
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
    include="syncapp.tasks.copy_data",
)

app.conf.update(
    broker_connection_retry_on_startup=True,
    worker_hijack_root_logger=False,
)


@signals.after_setup_logger.connect()
def after_setup_logger(logger, *args, **kwargs):
    logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
    with open(settings.log_config_path, "r") as config_file:
        YAMLConfig(config_file.read())
    logger_celery = logging.getLogger("celery")
    for handler in logger_celery.handlers:
        logger.addHandler(handler)
    logger.info(f"Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")


@signals.after_setup_task_logger.connect()
def after_setup_task_logger(logger, *args, **kwargs):
    logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
    with open(settings.log_config_path, "r") as config_file:
        YAMLConfig(config_file.read())
    logger_celery_task = get_task_logger("syncapp.tasks.copy_data")
    for handler in logger_celery_task.handlers:
        logger.addHandler(handler)
    logger.info(f"Task Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")

This does not quite solve the problem. Some observations:

From the logs, I can see that the handlers got attached with the logger:

syncdata-1  | [2024-10-14 13:19:15.384] [pid 1] [INFO] - [root]: Logging configured. Name: root, Handlers: [<StreamHandler <stdout> (NOTSET)>, <QueueListenerHandler (NOTSET)>]
syncdata-1  | [2024-10-14 13:19:15.390] [pid 1] [INFO] - [celery.task]: Task Logging configured. Name: celery.task, Handlers: [<StreamHandler <stdout> (NOTSET)>, <FileHandler /es-sync/logs/synctask.log (NOTSET)>]

The main Celery process can send data to the corresponding log file. In the task, I can see the logger has handlers:

syncdata-1  | [2024-10-14 13:19:25.712] [pid 16] [WARNING] - [celery.redirected]: Logger Name: syncapp.tasks.copy_data, Handlers: [<StreamHandler <stdout> (NOTSET)>, <FileHandler /es-sync/logs/synctask.log (NOTSET)>]

Interesting point here: even though the logger syncapp.tasks.copy_data has a parent logger celery.task, configuring the logger for syncapp.tasks or celery.task did not work. It had to be named exactly syncapp.tasks.copy_data.

Note that my loggers were using a queue handler. The queue object was being accessed from the main Celery thread and the task thread, and this is somehow stopping the queue handler from emitting logs.

If I enable propagation to the root logger, I can see that the logs were, indeed, generated:

syncdata-1  | [2024-10-14 13:19:25.712] [pid 16] [INFO] - [syncapp.tasks.copy_data]: Job ID: 18b9953a-e5cf-4939-a535-2bfe47678016, Task ID: b5aa83e1-cb81-4145-a142-13dd4c59124a. Copying data from source index xyz

What would be the thread-safe way to utilize a queue handler? For now, the solution is not to use the queue handler, and explicitly use the file and console handlers.


Solution

  • This is not a proper answer, but rather a workaround until I find a real solution.

    Problem: The main problem is that the Queue handler is not async safe and therefore stops emitting log if it was accessed from the main Celery thread.

    Similar problem: Django + Celery logging: custom handler works in main process but not celery worker process

    Step 1:

    The logger names should match with the task logger name from log.get_task_logger exactly. Also, instead of using a Queue handler, directly use the File and Console handlers. Change in the logging.yaml file:

    loggers:
      syncapp.tasks.copy_data:        // --> using full task logger name
        level: DEBUG
        handlers:
          - hl1_console
          - hl1_synctask_file_handler
        propagate: no
    

    Step 2:

    In the Celery app, connect to signals.after_setup_logger and signals.after_setup_task_logger and attach desired handlers to the callback logger object.

    @signals.after_setup_logger.connect()
    def after_setup_logger(logger, *args, **kwargs):
        logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
        with open(settings.log_config_path, "r") as config_file:
            logging.config.dictConfig(yaml.safe_load(config_file.read()))
        logger_celery = logging.getLogger("celery")
        for handler in logger_celery.handlers:
            logger.addHandler(handler)
        logger.info(f"Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")
    
    
    @signals.after_setup_task_logger.connect()
    def after_setup_task_logger(logger, *args, **kwargs):
        logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
        with open(settings.log_config_path, "r") as config_file:
            logging.config.dictConfig(yaml.safe_load(config_file.read()))
        logger_celery_task = get_task_logger("syncapp.tasks.copy_data")
        for handler in logger_celery_task.handlers:
            logger.addHandler(handler)
        logger.info(f"Task Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")
    

    This change was suggested in SO question: Celery logger configuration

    Now I can see logs in respective output files.