djangocelerydjango-celerycelery-task

Celery Limit the Number of Tasks Running per User


I have a task in Celery that looks like this:

@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
    # Long running task

This task is created in views every time a user submits a form, the task requires a lot of resources and takes around 10 minutes on average to complete.

(views.py)
...
if request.method == 'POST':
    task_one.delay(user.id)
...

I want to limit the number of task_one tasks created per user to one (either active or reserved)

What I'm doing so far, is checking if there is a task active or reserved for that user before creating the task:

def user_created_task(active_tasks, reserved_tasks, user_id):
  for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
    if task['name'] == 'task_one' and task['args'][0] == user_id:
      # Check if there is a `task_one` task created for the user
      return True
  
  return False

def user_tasks_already_running_or_reserved(user_id):
  inspect = app.control.inspect()

  active_tasks = inspect.active()
  reserved_tasks = inspect.reserved()

  if active_tasks is None and reserved_tasks is None:
    # Celery workers are disconnected 
    return False

  return user_created_task(active_tasks, reserved_tasks, user_id)


(views.py)
...
if request.method == 'POST':
    if not user_tasks_already_running_or_reserved(user.id):
        task_one.delay(user.id)
...

I was wondering if there is a more efficient way of doing this, instead of inspecting all the workers on every user request, maybe there's a way of adding this condition on Celery before the task runs, so far I haven't found anything in the documentation.


Solution

  • The situation you are describing calls for the use of a distributed lock (because n = 1), but can be more generally described as a distributed semaphore. Roughly speaking, these locks and mechanisms fall outside of what is built into celery.

    As mentioned by the commenters (hat tip: @bernhard vallant), a straightforward implementation of a distributed lock would normally utilize something like a table in a database or a redis rlock / redlocks.

    In order to utilize one common implementation, you can do the following:

    from redlock import MultipleRedlockException, Redlock
    from django.conf import settings
    
    @app.task(name='task_one', autoretry_for=(MultipleRedlockException, ),  retry_kwargs={'max_retries': 5})
    def task_one(user_id, *args, **kwargs):
        # assumes you are using redis for django cache with location
        # set to the redis url
        lock_manager = Redlock([ settings.CACHES['default']['LOCATION'] ])
        lock_name = f'task_one:{user_id}'
        # if the lock fails, we'll get the MultipleRedlockException and trigger
        # celery auto retry
        lock_manager.lock(lock_name, 60 * 60 * 2)  # lock for 2 hours
        try:
            # the main body of what you want to do goes here
            pass
        finally:
            lock_manager.unlock(lock_name)