I am using django ORM to communicate with MySQL database inside the callback functions of my RabbitMQ consumers. These consumers are running on a separate threads and each consumer has established its own connection to its queue.
Here is the code for two of my consumer callbacks:
TasksExecutorService
# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties
import uuid
from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError
from .master_service import MasterConsumerSerivce
class TaskExecutorService(MasterConsumerSerivce):
queue = 'master_tasks'
@classmethod
def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
# get task
task_id_str = message.get('task_id')
task_id = uuid.UUID(task_id_str)
task_qs = Task.objects.filter(pk=task_id)
if not task_qs.exists():
raise ServiceError(message=f'Task {task_id_str} does not exist')
task = task_qs.first()
# check if task is stopped
if task.status == cls.Status.TASK_STOPPED:
raise ServiceError(message=f'Task {task_id_str} is stopped')
# send task to results queue
publisher = cls.get_publisher(queue=cls.Queues.results_queue)
published, error = publisher.publish(message=message | {'status': True, 'error': None})
if not published:
raise ServiceError(message=str(error))
# update task status
task.status = cls.Status.TASK_PROCESSING
task.save()
return
ResultsHandlerService
# imports
from pika.spec import Basic
from pika.channel import Channel
from pika import BasicProperties
import uuid
from jobs.models import Task
from exceptions import MasterConsumerServiceError as ServiceError
from .master_service import MasterConsumerSerivce
class ResultHandlerService(MasterConsumerSerivce):
queue = 'master_results'
@classmethod
def callback(cls, ch: Channel, method: Basic.Deliver, properties: BasicProperties, message: dict):
# get task
task_id_str = message.get('task_id')
task_id = uuid.UUID(task_id_str)
task_qs = Task.objects.filter(pk=task_id)
if not task_qs.exists():
raise ServiceError(message=f'Task {task_id_str} does not exist')
task = task_qs.first()
# get result data and status
data = message.get('data')
status = message.get('status')
# if task is not successful
if not status:
# fail task
task.status = cls.Status.TASK_FAILED
task.save()
# fail job
task.job.status = cls.Status.JOB_FAILED
task.job.save()
return
# update task status
task.status = cls.Status.TASK_DONE
task.save()
# check if job is complete
task_execution_order = task.process.execution_order
next_task_qs = Task.objects.select_related('process').filter(job=task.job, process__execution_order=task_execution_order + 1)
is_job_complete = not next_task_qs.exists()
# check job is complete
if is_job_complete:
# publish reults
publisher = cls.get_publisher(queue=cls.Queues.output_queue)
published, error = publisher.publish(message={'job_id': str(task.job.id), 'data': data})
if not published:
raise ServiceError(message=str(error))
# update job status
task.job.status = cls.Status.JOB_DONE
task.job.save()
# otherwise
else:
# publish next task
next_task = next_task_qs.first()
publisher = cls.get_publisher(queue=cls.Queues.tasks_queue)
published, error = publisher.publish(message={'task_id': str(next_task.id), 'data': data})
if not published:
raise ServiceError(message=str(error))
# update next task status
next_task.status = cls.Status.TASK_QUEUED
next_task.save()
return
The problem is that wherever I am using:
task.status = cls.Status.TASK_ABC
task.save()
the resulting behavior is very erratic. Sometimes it all works fine and all the statuses are updated as expected, but most often the statuses are never updated even if the process flow finishes as expected with my output queue getting populated with results. If I log the task status after performing task.save()
, the logged status is also what I expect to see but the value inside the database is never updated.
I will gladly provide more code if required. Kindly help me fix this issue.
This is called a "Race condition". You've got two different threads modifying the same object at the same time.
At some point in time, it is bound to happen that they both will have stale data.
I.E., when the Thread B changes and saves the object, Thread A's data becomes stale. And at this point if Thread A saves the object, then it will save the stale data (old values) to the database. So the data goes back to how it was in the beginning.
Also, threads may execute in any order. That's why you're seeing this erratic behaviour.
This happens because Django doesn't automatically update the model instance object when the data changes in the database. That's why the data gets stale.
Solution to race condition is using locks.
Now, I don't quite understand all the things going on in your code so I can't give you a solution as code example. But you can fix your issue by using select_for_update
and atomic transactions.