I am creating PeriodicTasks for opening user an access to the next lesson using this code:
task = PeriodicTask.objects.create(
interval=enrolment.course.interval,
name=f"enrolment_id: {enrolment.id}",
task="courses.tasks.next_program",
args=json.dumps([enrolment.id]),
)
enrolment.task = task
enrolment.save()
How to get datetime next task will be ran?
I used celery's built-in schedule.remaining_estimate
:
def datetime_next_lesson(self):
task = self.task
if not task: # just in case
return 0
# if task hasn't been run yet, last_run_at will be null, use datetime_created in this case
last_run_at = task.last_run_at or task.start_time or self.datetime_created
return timezone.now() + task.schedule.remaining_estimate(last_run_at)