I used Celery several times in the past and the idea I always had is that:
However, in my current application I was asked to use Redis, and let the team that had to send me the calls to my FastAPI deliver directly their messages in Redis, so we avoid using the FastAPI in the middle. I have already done this, I know how they have to write the messages in Redis. Nontheless, we still have the question about how to ping them that the results we have are already done. Is there any alternative than asking every N seconds? I was thining they might send me an ID and I had to ping them with that ID, but they were asing something about using redis streams. I'm connecting using "rediss", but tasks are given to redis lists, and results are Redis strings.
Following my comment, and with the help from IA, this worked!
I'm sending tasks results (failures or succesfuls) to a Redis Stream, appart from publishing the result as a Redis string. So later the front can listen to the results from the stream and trigger the event automatically.
task_monitoring.py:
"""
Task result monitoring with Redis streams.
This module uses Celery signals to automatically capture task results
and publish them to Redis streams.
"""
from datetime import datetime
import json
from typing import Any
import redis
from celery import Task
from celery.signals import task_success, task_failure
# Initialize Redis client
redis_client = None
def get_redis_client() -> redis.Redis:
"""Get Redis client using the same connection as Celery."""
global redis_client
if redis_client is None:
connection_url = f"rediss://:{youpass}@{yourhost}:{yourport}/0"
redis_client = redis.Redis.from_url( # I'm using tls
connection_url,
ssl_cert_reqs=None,
decode_responses=True
)
return redis_client
STREAM_NAME = "your_redis_prefix"
@task_success.connect
def task_success_handler(sender: Task | None = None, result: Any | None = None, **kwargs) -> None:
"""
Handle successful task completion and publish result to Redis stream.
Args:
sender: The Celery Task instance that succeeded
result: The result returned by the task
**kwargs: Additional signal arguments
"""
try:
# Get task info from the sender (Celery Task instance)
task_id = sender.request.id if sender else 'unknown'
task_name = sender.name if sender else 'unknown'
# Get Redis client
redis_client = get_redis_client()
# Prepare result data
data = {
"task_id": task_id,
"task_name": task_name,
"status": "SUCCESS",
"timestamp": datetime.now().isoformat(),
}
# Add the actual result
try:
# Serialize the result
if isinstance(result, (dict, list)):
data["result"] = json.dumps(result)
else:
data["result"] = str(result)
except Exception as e:
data["result"] = f"Error serializing: {str(e)}"
# Add to Redis stream
redis_client.xadd(
STREAM_NAME,
data,
maxlen=10000, # Limit stream size
approximate=True
)
except Exception:
logger.error("Error publishing task success to Redis stream with task_id: %s", task_id)
@task_failure.connect
def task_failure_handler(sender: Task | None = None, exception: Exception | None = None, **kwargs) -> None:
"""
Handle task failure and publish error to Redis stream.
Args:
sender: The Celery Task instance that failed
exception: The exception that caused the task to fail
**kwargs: Additional signal arguments
"""
try:
# Get task info from the sender (Celery Task instance)
task_id = sender.request.id if sender else 'unknown'
task_name = sender.name if sender else 'unknown'
# Get Redis client
redis_client = get_redis_client()
# Prepare error data
data = {
"task_id": task_id,
"task_name": task_name,
"status": "FAILURE",
"timestamp": datetime.now().isoformat(),
"error": str(exception) if exception else "Unknown error"
}
# Add to Redis stream
redis_client.xadd(
STREAM_NAME,
data,
maxlen=10000, # Limit stream size
approximate=True
)
except Exception:
logger.error("Error publishing task failure to Redis stream with task_id: %s", task_id)
celery.py:
"""
Celery application configuration.
This module sets up the Celery application with appropriate configuration.
"""
import config as celeryconfig # config is your file with celery configurations
from celery import Celery
# Create the Celery app instance
app = Celery("YOUR_APP_NAME")
# Load configuration from config module
app.config_from_object(celeryconfig)
# Auto-discover tasks in the tasks directory
app.autodiscover_tasks(["DIR_TO_YOUR_TASKS"])
# Import the task monitoring module to register signal handlers
import task_monitoring # noqa
# Make the app available at the module level
celery_app = app