I'm trying to learn how to use the Fast API library. I'm trying to accept an array in the post using frozenset as the docs states, but it doesn't seem to work.
import logging
from fastapi import FastAPI, BackgroundTasks
from worker.celery_app import celery_app
log = logging.getLogger(__name__)
app = FastAPI()
def celery_on_message(body):
"""
Logs the initiation of the endpoint
"""
log.warn(body)
def background_on_message(task):
"""
logs the function when it is added to queue
"""
log.warn(task.get(on_message=celery_on_message, propagate=False))
@app.get("/")
async def root(stocks: frozenset, background_task: BackgroundTasks):
"""
:param stocks: stocks to be analyzed
:param background_task: initiate the tasks queue
:type background_task: starlette.background.BackgroundTasks
:return:
"""
task_name = None
# set correct task name based on the way you run the example
log.log(level=1, msg=f'{stocks}')
task_name = "app.app.worker.celery_worker.compute_stock_indicators"
task = celery_app.send_task(task_name, args=[stocks])
background_task.add_task(background_on_message, task)
return {"message": "Stocks analyzed"}
When I'm using the swagger docs to send a request:
curl -X GET "http://127.0.0.1:8000/?stocks=wix&stocks=amzn" -H "accept: application/json"
the response is:
{
"detail": [
{
"loc": [
"query",
"stocks"
],
"msg": "value is not a valid frozenset",
"type": "type_error.frozenset"
}
]
}
I think there is a small mistake
You are trying to send a POST request but you only have a GET endpoint
@app.post("/")
this should fix the issue, but also, when using Type to annotate, use typing
from typing import FrozenSet
@app.post("/")
async def root(stocks: FrozenSet, background_task: BackgroundTasks):
Also receiving a Body in GET request is now supported by World Wide Web(W3), but it's also highly discouraged. See W3 Publication about it. OpenAPI includes the information of body in the request, and it 'll be supported by OpenAPI in the next releases, but it's not supported yet but SwaggerUI.