I am using Celery as the background task processor with a FastAPI web server. Logging for FastAPI is configured using a logging.yaml
file via Uvicorn like this:
uvicorn syncapp.main:app --workers 4 --host 0.0.0.0 --port 8084 --log-config logging.yaml
I have configured multiple loggers which work fine for the API server. However, no matter what I do, I cannot get Celery to use a logger defined in the logging.yaml
file. I have tried other solutions, but none seem to work for my case.
Celery main application: (Path: syncapp/worker.py
)
from celery import Celery
from syncapp.settings import get_settings
settings = get_settings()
app = Celery(
main=settings.app_name,
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include="syncapp.tasks.copy_data",
)
app.conf.update(
broker_connection_retry_on_startup=True,
worker_hijack_root_logger=False,
)
Example task method (Path: syncapp/tasks/copy_data.py
)
from celery.utils.log import get_task_logger
from syncapp.commons.enums import WorkerQueue
from syncapp.worker import app
logger = get_task_logger(__name__)
@app.task(queue=WorkerQueue.COPY_DATA.value)
def copy_index_data(index_name: str) -> bool:
"""Copies index data up until the current date."""
task_id = celery.current_task.request.id
logger.info("Job %s: Copying data from source index %s", task_id, index_name)
return True
Contents of logging.yaml
file (some keys are removed to keep it short):
version: 1
...
handlers:
hl1_console:
class: logging.StreamHandler
formatter: simple
stream: ext://sys.stdout
hl1_syncapp_file_handler:
class: logging.FileHandler
filename: logs/syncapp.log
formatter: extended
hl1_synctask_file_handler:
class: logging.FileHandler
filename: logs/synctask.log
formatter: extended
hl1_celery_file_handler:
class: logging.FileHandler
filename: logs/celery.log
formatter: extended
hl2_synctask_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.hl1_console
- cfg://handlers.hl1_synctask_file_handler
queue: cfg://objects.synctask_queue
hl2_celery_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.hl1_console
- cfg://handlers.hl1_celery_file_handler
queue: cfg://objects.celery_queue
hl2_syncapp_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.hl1_console
- cfg://handlers.hl1_syncapp_file_handler
queue: cfg://objects.syncapp_queue
loggers:
celery.task:
level: DEBUG
handlers:
- hl2_synctask_queue_handler
propagate: no
celery:
level: INFO
handlers:
- hl2_celery_queue_handler
propagate: no
syncapp:
level: DEBUG
handlers:
- hl2_syncapp_queue_handler
propagate: no
root:
level: INFO
handlers:
- hl1_console
Running Celery worker with the following command (did not mention the logfile parameter here):
celery -A syncapp.worker:app worker --concurrency=4 --hostname=copy@%h --queues=syncapp.tasks.copy_data --loglevel=INFO
I want the Celery backend to log its events using the celery
logger and the tasks logs to syncapp.tasks
logger. The main application logs should go through syncapp
logger (which it does).
Update #4
Instead of preventing Celery from configuring logger by connecting to signals.setup_logging
, I can manually reinitialize logger by connecting to signals.after_setup_logger
and signals.after_setup_task_logger
. The code looks like this (yes, repeating code could be tidied up)
app = Celery(
main=settings.app_name,
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include="syncapp.tasks.copy_data",
)
app.conf.update(
broker_connection_retry_on_startup=True,
worker_hijack_root_logger=False,
)
@signals.after_setup_logger.connect()
def after_setup_logger(logger, *args, **kwargs):
logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
with open(settings.log_config_path, "r") as config_file:
YAMLConfig(config_file.read())
logger_celery = logging.getLogger("celery")
for handler in logger_celery.handlers:
logger.addHandler(handler)
logger.info(f"Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")
@signals.after_setup_task_logger.connect()
def after_setup_task_logger(logger, *args, **kwargs):
logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
with open(settings.log_config_path, "r") as config_file:
YAMLConfig(config_file.read())
logger_celery_task = get_task_logger("syncapp.tasks.copy_data")
for handler in logger_celery_task.handlers:
logger.addHandler(handler)
logger.info(f"Task Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")
This does not quite solve the problem. Some observations:
From the logs, I can see that the handlers got attached with the logger:
syncdata-1 | [2024-10-14 13:19:15.384] [pid 1] [INFO] - [root]: Logging configured. Name: root, Handlers: [<StreamHandler <stdout> (NOTSET)>, <QueueListenerHandler (NOTSET)>]
syncdata-1 | [2024-10-14 13:19:15.390] [pid 1] [INFO] - [celery.task]: Task Logging configured. Name: celery.task, Handlers: [<StreamHandler <stdout> (NOTSET)>, <FileHandler /es-sync/logs/synctask.log (NOTSET)>]
The main Celery process can send data to the corresponding log file. In the task, I can see the logger has handlers:
syncdata-1 | [2024-10-14 13:19:25.712] [pid 16] [WARNING] - [celery.redirected]: Logger Name: syncapp.tasks.copy_data, Handlers: [<StreamHandler <stdout> (NOTSET)>, <FileHandler /es-sync/logs/synctask.log (NOTSET)>]
Interesting point here: even though the logger syncapp.tasks.copy_data
has a parent logger celery.task
, configuring the logger for syncapp.tasks
or celery.task
did not work. It had to be named exactly syncapp.tasks.copy_data
.
Note that my loggers were using a queue handler. The queue object was being accessed from the main Celery thread and the task thread, and this is somehow stopping the queue handler from emitting logs.
If I enable propagation to the root logger, I can see that the logs were, indeed, generated:
syncdata-1 | [2024-10-14 13:19:25.712] [pid 16] [INFO] - [syncapp.tasks.copy_data]: Job ID: 18b9953a-e5cf-4939-a535-2bfe47678016, Task ID: b5aa83e1-cb81-4145-a142-13dd4c59124a. Copying data from source index xyz
What would be the thread-safe way to utilize a queue handler? For now, the solution is not to use the queue handler, and explicitly use the file and console handlers.
This is not a proper answer, but rather a workaround until I find a real solution.
Problem: The main problem is that the Queue handler is not async safe and therefore stops emitting log if it was accessed from the main Celery thread.
Similar problem: Django + Celery logging: custom handler works in main process but not celery worker process
Step 1:
The logger names should match with the task logger name from log.get_task_logger
exactly. Also, instead of using a Queue handler, directly use the File and Console handlers. Change in the logging.yaml file:
loggers:
syncapp.tasks.copy_data: // --> using full task logger name
level: DEBUG
handlers:
- hl1_console
- hl1_synctask_file_handler
propagate: no
Step 2:
In the Celery app, connect to signals.after_setup_logger
and signals.after_setup_task_logger
and attach desired handlers to the callback logger object.
@signals.after_setup_logger.connect()
def after_setup_logger(logger, *args, **kwargs):
logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
with open(settings.log_config_path, "r") as config_file:
logging.config.dictConfig(yaml.safe_load(config_file.read()))
logger_celery = logging.getLogger("celery")
for handler in logger_celery.handlers:
logger.addHandler(handler)
logger.info(f"Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")
@signals.after_setup_task_logger.connect()
def after_setup_task_logger(logger, *args, **kwargs):
logger.info(f"Logger before configuration: {logger.name}, Handlers: {logger.handlers}")
with open(settings.log_config_path, "r") as config_file:
logging.config.dictConfig(yaml.safe_load(config_file.read()))
logger_celery_task = get_task_logger("syncapp.tasks.copy_data")
for handler in logger_celery_task.handlers:
logger.addHandler(handler)
logger.info(f"Task Logging configured. Name: {logger.name}, Handlers: {logger.handlers}")
This change was suggested in SO question: Celery logger configuration
Now I can see logs in respective output files.