pythonrediscelerycelery-task

Is there an alternative to ask every n seconds to know if a celery task is done?


I used Celery several times in the past and the idea I always had is that:

However, in my current application I was asked to use Redis, and let the team that had to send me the calls to my FastAPI deliver directly their messages in Redis, so we avoid using the FastAPI in the middle. I have already done this, I know how they have to write the messages in Redis. Nontheless, we still have the question about how to ping them that the results we have are already done. Is there any alternative than asking every N seconds? I was thining they might send me an ID and I had to ping them with that ID, but they were asing something about using redis streams. I'm connecting using "rediss", but tasks are given to redis lists, and results are Redis strings.


Solution

  • Following my comment, and with the help from IA, this worked!

    I'm sending tasks results (failures or succesfuls) to a Redis Stream, appart from publishing the result as a Redis string. So later the front can listen to the results from the stream and trigger the event automatically.

    task_monitoring.py:

    """
    Task result monitoring with Redis streams.
    
    This module uses Celery signals to automatically capture task results 
    and publish them to Redis streams.
    """
    
    from datetime import datetime
    import json
    from typing import Any
    import redis
    from celery import Task
    from celery.signals import task_success, task_failure
    
    # Initialize Redis client
    redis_client = None
    
    def get_redis_client() -> redis.Redis:
        """Get Redis client using the same connection as Celery."""
        global redis_client
        if redis_client is None:
            connection_url = f"rediss://:{youpass}@{yourhost}:{yourport}/0"          
            redis_client = redis.Redis.from_url(  # I'm using tls
                connection_url,
                ssl_cert_reqs=None,
                decode_responses=True
            )
        return redis_client
    
    STREAM_NAME = "your_redis_prefix"
    
    @task_success.connect
    def task_success_handler(sender: Task | None = None, result: Any | None = None, **kwargs) -> None:
        """
        Handle successful task completion and publish result to Redis stream.
        
        Args:
            sender: The Celery Task instance that succeeded
            result: The result returned by the task
            **kwargs: Additional signal arguments
        """
        try:
            # Get task info from the sender (Celery Task instance)
            task_id = sender.request.id if sender else 'unknown'
            task_name = sender.name if sender else 'unknown'
            
            # Get Redis client
            redis_client = get_redis_client()
            
            # Prepare result data
            data = {
                "task_id": task_id,
                "task_name": task_name,
                "status": "SUCCESS",
                "timestamp": datetime.now().isoformat(),
            }
            
            # Add the actual result
            try:
                # Serialize the result
                if isinstance(result, (dict, list)):
                    data["result"] = json.dumps(result)
                else:
                    data["result"] = str(result)
            except Exception as e:
                data["result"] = f"Error serializing: {str(e)}"
            
            # Add to Redis stream
            redis_client.xadd(
                STREAM_NAME,
                data,
                maxlen=10000,  # Limit stream size
                approximate=True
            )
        except Exception:
            logger.error("Error publishing task success to Redis stream with task_id: %s", task_id)
    
    @task_failure.connect
    def task_failure_handler(sender: Task | None = None, exception: Exception | None = None, **kwargs) -> None:
        """
        Handle task failure and publish error to Redis stream.
        
        Args:
            sender: The Celery Task instance that failed
            exception: The exception that caused the task to fail
            **kwargs: Additional signal arguments
        """
        try:
            # Get task info from the sender (Celery Task instance)
            task_id = sender.request.id if sender else 'unknown'
            task_name = sender.name if sender else 'unknown'
            
            # Get Redis client
            redis_client = get_redis_client()
            
            # Prepare error data
            data = {
                "task_id": task_id,
                "task_name": task_name,
                "status": "FAILURE",
                "timestamp": datetime.now().isoformat(),
                "error": str(exception) if exception else "Unknown error"
            }
            
            # Add to Redis stream
            redis_client.xadd(
                STREAM_NAME,
                data,
                maxlen=10000,  # Limit stream size
                approximate=True
            )
        except Exception:
            logger.error("Error publishing task failure to Redis stream with task_id: %s", task_id)
    

    celery.py:

    """
    Celery application configuration.
    
    This module sets up the Celery application with appropriate configuration.
    """
    
    import config as celeryconfig # config is your file with celery configurations
    from celery import Celery
    
    # Create the Celery app instance
    app = Celery("YOUR_APP_NAME")
    
    # Load configuration from config module
    app.config_from_object(celeryconfig)
    
    # Auto-discover tasks in the tasks directory
    app.autodiscover_tasks(["DIR_TO_YOUR_TASKS"])
    
    # Import the task monitoring module to register signal handlers
    import task_monitoring  # noqa
    
    # Make the app available at the module level
    celery_app = app