My problem is that the beat scheduler doesn't store entries in the table tasks
and workers
. I use django and celery. In my database (MySQL) I have added a periodic task named "Estimate Region" with 120 seconds interval.
This is how I start my worker:
python manage.py celery worker -n worker.node1 -B --loglevel=info &
After I started the worker I can see in the terminal that the worker works and the scheduler picks out the periodic task from the database and operates it.
How my task is defined:
@celery.task(name='fv.tasks.estimateRegion',
ignore_result=True,
max_retries=3)
def estimateRegion(region):
Terminal shows this:
WARNING ModelEntry: Estimate Region fv.tasks.estimateRegion(*['ASIA'], **{}) {<freq: 2.00 minutes>}
[2013-05-23 10:48:19,166: WARNING/MainProcess] <ModelEntry: Estimate Region fv.tasks.estimateRegion(*['ASIA'], **{}) {<freq: 2.00 minutes>}>
INFO Calculating estimators for exchange:Bombay Stock Exchange
The task "Estimate Region" creates a results.csv
file, so I can see that the worker and the beat scheduler works. But after that I have no database entries in tasks
or workers
in my django admin panel.
Here are my celery settings in settings.py
:
CELERY_DISABLE_RATE_LIMITS = True
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_IMPORTS = ('fv.tasks')
CELERY_RESULT_PERSISTENT = True
# amqp settings
BROKER_URL = 'amqp://fv:password@localhost'
#BROKER_URL = 'amqp://fv:password@192.168.99.31'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_TASK_RESULT_EXPIRES = 18000
CELERY_ROUTES = (fv.routers.TaskRouter(), )
_estimatorExchange = Exchange('estimator')
CELERY_QUEUES = (
Queue('celery', Exchange('celery'), routing_key='celery'),
Queue('estimator', _estimatorExchange, routing_key='estimator'),
)
# beat scheduler settings
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
# development settings
CELERY_RESULT_PERSISTENT = False
CELERY_DEFAULT_DELIVERY_MODE = 'transient'`
I hope that someone can help me :)
Have you started celerycam?
python manage.py celerycam
It will take a snapshot (every 1 second by default) of the current state of tasks.
You can read more about it in the celery documentation