I have a fastAPI app where I want to call a celery task I can not import the task as they are in two different code base. So I have to call it using its name.
in tasks.py
imagery = Celery(
"imagery", broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL")
)
...
@imagery.task(bind=True, name="filter")
def filter_task(self, **kwargs) -> Dict[str, Any]:
print('running task')
The celery worker is running with this command:
celery worker -A worker.imagery -P threads --loglevel=INFO --queues=imagery
Now in my FastAPI code base I want to run the filter task. So my understanding is I have to use the celery.send_task() function
In app.py
I have
from celery import Celery, states
from celery.execute import send_task
from fastapi import FastAPI
from starlette.responses import JSONResponse, PlainTextResponse
from app import models
app = FastAPI()
tasks = Celery(broker=os.getenv("BROKER_URL"), backend=os.getenv("REDIS_URL"))
@app.post("/filter", status_code=201)
async def upload_images(data: models.FilterProductsModel):
"""
TODO: use a celery task(s) to query the database and upload the results to S3
"""
data = ['ok', 'un test']
data = ['ok', 'un test']
result = tasks.send_task('workers.imagery.filter', args=list(data))
return PlainTextResponse(f"here is the id: {str(result.ready())}")
After calling the /filter
endpoint, I don't see any task being picked up by the worker.
So I tried different name in send_task()
How come my task never get picked up by the worker and nothing shows in the log? Is my task name wrong?
Edit: The worker process run in docker. Here is the fullpath of the file on its disk.
/workers/worker.py
So if I follow the import schema. the name of the task would be workers.worker.filter
but this does not work, nothing get printed in the logs of docker. Is a print supposed to appear in the STDOUT of the celery cli?
OP Here.
This is the solution I used.
task = signature("filter", kwargs=data.dict() ,queue="imagery")
res = task.delay()
As mentioned by @DejanLekic I had to specify the queue.