pythoncelery

Celery + Redis tasks in different files


When I run Celery from the command line I can see only the tasks that are in the same file as the Celery object but not those in other files.

The structure of the project is the following:

celery_test
    celery_tasks
        __init__.py
        celery_app.py
        async
            __init__.py
            tasks.py
        marker
            __init__.py
            tasks.py

The content of the files is as follows

celery_app.py

from __future__ import absolute_import
from celery import Celery

celery_application = Celery('celery_test', backend='redis://localhost', broker='redis://localhost')

@celery_application.task
def test_celery():
    print 4

And any of the tasks.py files has something like this

async/tasks.py

from __future__ import absolute_import
import time

from celery_tasks.celery_app import celery_application


@celery_application.task
def async_test():
    print 'Start async_test'
    time.sleep(3)
    print 'Finish async_test'

When I run Celery as follows

celery --app=celery_tasks.celery_app:celery_application worker -l debug

I get the following

 -------------- celery@LAPTOP-HCR4G00Q v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-10-10.0.16299
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         celery_test:0x6ff3f28
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . celery_tasks.celery_app.test_celery

that is just the task that is in the same file as the application.

Any suggestions on how to solve it? I really need to separate the tasks by topics because they are many so that they are in a single file.


Solution

  • I spent a lot of time writing the question and I just solved it, so I share the solution because there is not much information about it (or at least I did not find it).

    After defining the Celery object, I had tried to autodiscover the tasks but it did not work. My last attempt was to change the name of the apps and force detection in the following way:

    celery_application.autodiscover_tasks(['celery_tasks.async', 'celery_tasks.marker'], force=True)

    And from celery_test/ run:

    celery --app=celery_tasks.celery_app:celery_application worker -l info

    That solved my problem. I hope this helps you