djangodjango-modelsdjango-formsceleryperiodic-task

Problem with Periodic tasks in django with Celery


My periodic tasks with celery is not working. I wish to update my database every night depending on the date. here is my ptasks.py file in the application directiory:

'''
import datetime
import celery
from celery.task.schedules import crontab
from celery.decorators import periodic_task
from django.utils import timezone
@periodic_task(run_every=crontab(hour=0, minute=0))
def every_night():
    tasks=Task.objects.all()
    form=TaskForm()
    if form.deadline<timezone.now() and form.staus=="ONGOING":
        form.status="PENDING"
        form.save()
'''

I am using ampq in settings.py:

'''
CELERY_BROKER_URL = 'amqp://guest:guest@localhost'
CELERYBEAT_SCHEDULER='djcelery.schedulers.DatabaseScheduler'
'''

Here is my models.py:

'''
from django.db import models
import datetime
import pytz
from django.utils import timezone
# Create your models here.
class Task(models.Model):
    title=models.CharField(max_length=30)
    complete=models.BooleanField(default=False)
    created=models.DateTimeField(default=timezone.now())
    comment=models.CharField(max_length=500, default="")
    Tag1=models.CharField(max_length=10, default="Tag1")
    deadline=models.DateTimeField(default=timezone.now())
    status=models.CharField(max_length=15,default="ONGOING")
    def __str__(self):
        return self.title
'''

Here is my forms.py:

'''
from django import forms
from django.forms import ModelForm

from .models import *

class TaskForm(forms.ModelForm):
    class Meta:
        model=Task
        fields='__all__'

'''

Solution

  • Solution 1

    To update the database records as you're planning you could do like this:

    <your_app>/models.py

    from django.db import models
    from django.utils import timezone
    
    
    class Task(models.Model):
    
        STATUS_PENDING = "pending"
        STATUS_ONGOING = "ongoing"
    
        STATUS_CHOICES = (
            (STATUS_ONGOING, "ongoing"),
            (STATUS_PENDING, "pending"),
        )
    
        status = models.CharField(
            max_length=15, default=STATUS_ONGOING, choices=STATUS_CHOICES
        )
        title = models.CharField(max_length=30)
        deadline = models.DateTimeField(auto_now_add=True)
        complete = models.BooleanField(default=False)
    
        def __str__(self):
            return self.title
    

    <your_app>/tasks.py

    from django.utils import timezone
    from celery.task.schedules import crontab
    from celery.decorators import periodic_task
    
    from .models import Task
    
    
    @periodic_task(run_every=crontab(hour=0, minute=0))
    def every_night():
    
        qs = Task.objects.filter(deadline__lt=timezone.now(), status=Task.STATUS_ONGOING)
    
        # Update whole queryset at once
        qs.update(status=Task.STATUS_PENDING)
    
        # Alternatively update one by one (e.g. if you need signals to be fired)
        # for task in qs:
        #     task.status = Task.STATUS_PENDING
        #     task.save()
    
    

    Make sure to run celery "beat" to trigger the periodic tasks:

    celery -A app worker -B 
    

    Solution 2

    However - for the situation described, I honestly don't see the point why to take the detours to add information in a distinct database field that easily can be derived by each record anyway. This adds redundancy in the database itself and seems to be unneeded.

    Why not just use a Manager to easily get the desired model instances?

    from django.db import models
    from django.utils import timezone
    
    
    class TaskManager(models.Manager):
        def pending(self):
            return (
                self.get_queryset()
                .filter(deadline__lt=timezone.now())
                .exclude(complete=True)
            )
    
    
    class Task(models.Model):
    
        title = models.CharField(max_length=30)
        deadline = models.DateTimeField(auto_now_add=True)
        complete = models.BooleanField(default=False)
    
        objects = TaskManager()
    
        def __str__(self):
            return self.title
    
    

    And get the "pending" queryset: Task.objects.pending()