This is my code to retrieve stream response from OpenAI's model which is event based. (I have shown only core part)
client = OpenAI(api_key=OPEN_AI_API_KEY)
class EventHandler(AssistantEventHandler):
def on_text_delta(self, delta: TextDelta, snapshot: Text):
print(delta.value)
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
on_text_delta event triggers as tokens arrives from API. I want to forward this response using FastAPI instead of printing on output screen.
@app.get("/stream")
async def stream():
return ...something...
I have tried responding result as part of HTTP body:
from fastapi.responses import StreamingResponse
...
@app.post("/stream")
async def stream():
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=EventHandler()
) as stream:
stream.until_done()
return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")
I have created generator_function
inside EventHandler
class but problem is until stream is not over the execution doesn't reach return statement.
I have also tried websockets, but still problem is how should my program execution should flow. The stream doesn't let execution go further until API response is completed.
I found the solution!
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI, AsyncOpenAI
OPEN_AI_API_KEY = 'you_api_key'
async_client = AsyncOpenAI(api_key=OPEN_AI_API_KEY)
client = OpenAI(api_key=OPEN_AI_API_KEY)
app = FastAPI()
async def stream_assistant_response(assistant_id, thread_id):
stream = async_client.beta.threads.runs.stream(
assistant_id=assistant_id,
thread_id=thread_id
)
async with stream as stream:
async for text in stream.text_deltas:
yield f"data: {text}\n\n"
@app.get("/message")
async def add_message(assistant_id, thread_id, message):
# make sure thread exist
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message
)
return StreamingResponse(stream_assistant_response(assistant_id, thread_id), media_type="text/event-stream")