I have a simple periodic task:
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription
@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
for subscription in Subscription.objects.filter(is_expired=True):
print(subscription)
subscription.is_active = False
subscription.can_activate = False
subscription.save()
And I want to cover it with tests.
I found information about how to test simple tasks, like @shared_task, but nowhere can I find an example of testing @periodic_task
Use the function apply()
. Documentation for apply()
.