pythondjangocelerycelerybeat

Find periodic task's next execution time in Celery Beat


I am creating PeriodicTasks for opening user an access to the next lesson using this code:

task = PeriodicTask.objects.create(
            interval=enrolment.course.interval,
            name=f"enrolment_id: {enrolment.id}",
            task="courses.tasks.next_program",
            args=json.dumps([enrolment.id]),
        )
enrolment.task = task
enrolment.save()

How to get datetime next task will be ran?


Solution

  • I used celery's built-in schedule.remaining_estimate:

    def datetime_next_lesson(self):
        task = self.task
        if not task: # just in case
            return 0
        # if task hasn't been run yet, last_run_at will be null, use datetime_created in this case
        last_run_at = task.last_run_at or task.start_time or self.datetime_created
        return timezone.now() + task.schedule.remaining_estimate(last_run_at)