pythondjangocronceleryperiodic-task

How to test celery periodic_task in Django?


I have a simple periodic task:

from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription

@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
    for subscription in Subscription.objects.filter(is_expired=True):
        print(subscription)
        subscription.is_active = False
        subscription.can_activate = False
        subscription.save()

And I want to cover it with tests.

I found information about how to test simple tasks, like @shared_task, but nowhere can I find an example of testing @periodic_task


Solution

  • Use the function apply(). Documentation for apply().