pythondjangodjango-rest-frameworkrabbitmqpython-multithreading

Django model.save() doing inconsistent updates


I am using django ORM to communicate with MySQL database inside the callback functions of my RabbitMQ consumers. These consumers are running on a separate threads and each consumer has established its own connection to its queue.

Here is the code for two of my consumer callbacks:

TasksExecutorService

# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties

import uuid

from jobs.models import Task

from exceptions import MasterConsumerServiceError as ServiceError

from .master_service import MasterConsumerSerivce


class TaskExecutorService(MasterConsumerSerivce):
  queue = 'master_tasks'

  @classmethod
  def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
    # get task
    task_id_str = message.get('task_id')
    task_id = uuid.UUID(task_id_str)
    task_qs = Task.objects.filter(pk=task_id)
    if not task_qs.exists():
      raise ServiceError(message=f'Task {task_id_str} does not exist')
    task = task_qs.first()

    # check if task is stopped
    if task.status == cls.Status.TASK_STOPPED:
      raise ServiceError(message=f'Task {task_id_str} is stopped')

    # send task to results queue
    publisher = cls.get_publisher(queue=cls.Queues.results_queue)
    published, error = publisher.publish(message=message | {'status': True, 'error': None})
    if not published:
      raise ServiceError(message=str(error))

    # update task status
    task.status = cls.Status.TASK_PROCESSING
    task.save()

    return

ResultsHandlerService

# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties

import uuid

from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError

from .master_service import MasterConsumerSerivce


class ResultHandlerService(MasterConsumerSerivce):
  queue = 'master_results'

  @classmethod
  def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
    # get task
    task_id_str = message.get('task_id')
    task_id = uuid.UUID(task_id_str)
    task_qs = Task.objects.filter(pk=task_id)
    if not task_qs.exists():
      raise ServiceError(message=f'Task {task_id_str} does not exist')
    task = task_qs.first()

    # get result data and status
    data = message.get('data')
    status = message.get('status')

    # if task is not successful
    if not status:
      # fail task
      task.status = cls.Status.TASK_FAILED
      task.save()

      # fail job
      task.job.status = cls.Status.JOB_FAILED
      task.job.save()

      return

    # update task status
    task.status = cls.Status.TASK_DONE
    task.save()

    # check if job is complete
    task_execution_order = task.process.execution_order
    next_task_qs = Task.objects.select_related('process').filter(job=task.job, process__execution_order=task_execution_order + 1)
    is_job_complete = not next_task_qs.exists()

    # check job is complete
    if is_job_complete:
      # publish reults
      publisher = cls.get_publisher(queue=cls.Queues.output_queue)
      published, error = publisher.publish(message={'job_id': str(task.job.id), 'data': data})
      if not published:
        raise ServiceError(message=str(error))

      # update job status
      task.job.status = cls.Status.JOB_DONE
      task.job.save()

    # otherwise
    else:
      # publish next task
      next_task = next_task_qs.first()
      publisher = cls.get_publisher(queue=cls.Queues.tasks_queue)
      published, error = publisher.publish(message={'task_id': str(next_task.id), 'data': data})
      if not published:
        raise ServiceError(message=str(error))

      # update next task status
      next_task.status = cls.Status.TASK_QUEUED
      next_task.save()

    return

The problem is that wherever I am using:

task.status = cls.Status.TASK_ABC
task.save()

the resulting behavior is very erratic. Sometimes it all works fine and all the statuses are updated as expected, but most often the statuses are never updated even if the process flow finishes as expected with my output queue getting populated with results. If I log the task status after performing task.save(), the logged status is also what I expect to see but the value inside the database is never updated.

I will gladly provide more code if required. Kindly help me fix this issue.


Solution

  • This is called a "Race condition". You've got two different threads modifying the same object at the same time.

    At some point in time, it is bound to happen that they both will have stale data.

    I.E., when the Thread B changes and saves the object, Thread A's data becomes stale. And at this point if Thread A saves the object, then it will save the stale data (old values) to the database. So the data goes back to how it was in the beginning.

    Also, threads may execute in any order. That's why you're seeing this erratic behaviour.

    This happens because Django doesn't automatically update the model instance object when the data changes in the database. That's why the data gets stale.

    Solution to race condition is using locks.

    Now, I don't quite understand all the things going on in your code so I can't give you a solution as code example. But you can fix your issue by using select_for_update and atomic transactions.