pythondjangocelerydjango-celeryperiodic-task

Django Celery Periodic Task How to call Task method That is inside class


I want to send periodic mail to certain user After the user has been registered to certain platform I tried to send email using celery which works fine and Now I want django celery to send periodic mail as of now I have set the email time period to be 15 seconds. Further code is here

celery.py

   app.conf.beat_schedule = {
    'send_mail-every-day-at-4': {
        'task': 'apps.users.usecases.BaseUserUseCase().email_send_task()',
        'schedule': 15
    }
   }

I have my Class in this way ->apps.users.usecases.BaseUserUseCase.email_send_task here is my usecases to send email

 class BaseUserUseCase:
    ##other code is skipped 

    @shared_task
    def email_send_task(self):
        print("done")
        return ConfirmationEmail(context=self.context).send(to=[self.receipent])

How do I call this email_send_task method am I doing right any help regarding this It is not working.


Solution

  • To enable class-based tasks:

    1. Change your class to inherit from celery.Task
    2. Change your @shared_task:email_send_task() to run()
    3. Call Celery.register_task() for your class. The result would be the callable task.
    4. Directly call the callable task from step-3 to manually enqueue tasks.

    References:

    File structure

    .
    ├── apps
    │   └── users
    │       └── usecases.py
    └── my_proj
        ├── celery.py
        └── settings.py
    

    celery.py

    import os
    
    from celery import Celery
    
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_proj.settings")  # Only if using Django. Otherwise remove this line.
    
    app = Celery("my_app")
    
    app.conf.update(
        imports=['apps.users.usecases'],
        beat_schedule={
            'send_mail-every-day-at-4': {
                'task': 'apps.users.usecases.BaseUserUseCase',
                'schedule': 5,
            },
        },
    )
    

    usecases.py

    from celery import Task
    
    from my_proj.celery import app
    
    
    class BaseUserUseCase(Task):
        def __init__(self, context, recipient):
            self.context = context
            self.recipient = recipient
    
        def run(self, context=None, recipient=None):  # Optional arguments context and recipient only if you need to explicitly change it for some calls
            target_context = context or self.context
            target_recipient = recipient or self.recipient
            print(f"Send email with {target_context} to {target_recipient}")
    
    
    BaseUserUseCaseTask = app.register_task(
        BaseUserUseCase(
            context={'setting': 'default'},
            recipient="default@email.com",
        )
    )
    

    Logs (Producer)

    $ celery --app=my_proj beat --loglevel=INFO
    [2021-08-20 09:50:22,193: INFO/MainProcess] Scheduler: Sending due task send_mail-every-day-at-4 (apps.users.usecases.BaseUserUseCase)
    [2021-08-20 09:50:27,181: INFO/MainProcess] Scheduler: Sending due task send_mail-every-day-at-4 (apps.users.usecases.BaseUserUseCase)
    

    Logs (Consumer)

    $ celery --app=my_proj worker --queues=celery --loglevel=INFO
    
    [tasks]
      . apps.users.usecases.BaseUserUseCase
    
    [2021-08-20 09:50:22,206: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[3bce46e8-98c0-410e-a156-83e293ba6337] received
    [2021-08-20 09:50:22,207: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
    [2021-08-20 09:50:22,207: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 09:50:22,207: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[3bce46e8-98c0-410e-a156-83e293ba6337] succeeded in 0.0002498120002201176s: None
    [2021-08-20 09:50:27,183: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[e1d2de2a-3e7d-4253-9641-41fd328a17ce] received
    [2021-08-20 09:50:27,183: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
    [2021-08-20 09:50:27,184: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 09:50:27,184: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[e1d2de2a-3e7d-4253-9641-41fd328a17ce] succeeded in 0.0001804589992389083s: None
    

    If you want to change the context and recipient as a scheduled task.

    celery.py

    ...
        beat_schedule={
            'send_mail-every-day-at-4': {
                'task': 'apps.users.usecases.BaseUserUseCase',
                'schedule': 5,
                'kwargs': {
                    'context': {'setting': 'custom'},
                    'recipient': "custom@email.com",
                },
            },
        },
    ...
    

    Logs (Consumer):

    [2021-08-20 09:54:36,530: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[6b69b79b-6764-4d83-ad24-9b0723dd8c79] received
    [2021-08-20 09:54:36,531: WARNING/ForkPoolWorker-4] Send email with {'setting': 'custom'} to custom@email.com
    [2021-08-20 09:54:36,531: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 09:54:36,532: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[6b69b79b-6764-4d83-ad24-9b0723dd8c79] succeeded in 0.0012146830003985087s: None
    [2021-08-20 09:54:41,498: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[a20d34e7-3214-4130-aba2-5544238096d0] received
    [2021-08-20 09:54:41,499: WARNING/ForkPoolWorker-4] Send email with {'setting': 'custom'} to custom@email.com
    [2021-08-20 09:54:41,499: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 09:54:41,500: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[a20d34e7-3214-4130-aba2-5544238096d0] succeeded in 0.00047696000001451466s: None
    

    If you intend to call the task manually e.g. from one of the Django views:

    >>> from apps.users.usecases import BaseUserUseCaseTask
    >>> BaseUserUseCaseTask.apply_async()
    <AsyncResult: fd347270-59b8-4cec-8772-cf82a79e60df>
    >>> BaseUserUseCaseTask.apply_async(kwargs={'context': {'setting': 'manual'}, 'recipient': "manual@email.com"})
    <AsyncResult: 13d9df7e-e1c4-4f50-847f-72a413404c82>
    

    Logs (Consumer):

    [2021-08-20 09:57:19,324: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[fd347270-59b8-4cec-8772-cf82a79e60df] received
    [2021-08-20 09:57:19,324: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
    [2021-08-20 09:57:19,324: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 09:57:19,325: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[fd347270-59b8-4cec-8772-cf82a79e60df] succeeded in 0.00026283199986210093s: None
    [2021-08-20 12:35:29,238: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[13d9df7e-e1c4-4f50-847f-72a413404c82] received
    [2021-08-20 12:35:29,240: WARNING/ForkPoolWorker-4] Send email with {'setting': 'manual'} to manual@email.com
    [2021-08-20 12:35:29,240: WARNING/ForkPoolWorker-4] 
    
    [2021-08-20 12:35:29,240: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[13d9df7e-e1c4-4f50-847f-72a413404c82] succeeded in 0.000784056000156852s: None