pythonstreamfastapiopenai-api

How to forward OpenAI's stream response using FastAPI in python?


This is my code to retrieve stream response from OpenAI's model which is event based. (I have shown only core part)

client = OpenAI(api_key=OPEN_AI_API_KEY)

class EventHandler(AssistantEventHandler):
    def on_text_delta(self, delta: TextDelta, snapshot: Text):
        print(delta.value)

with client.beta.threads.runs.stream(
    thread_id=thread_id,
    assistant_id=assistant_id,
    event_handler=EventHandler()
) as stream:
stream.until_done()

on_text_delta event triggers as tokens arrives from API. I want to forward this response using FastAPI instead of printing on output screen.

@app.get("/stream")
async def stream():
    return ...something...

I have tried responding result as part of HTTP body:

from fastapi.responses import StreamingResponse

...

@app.post("/stream")
async def stream():
    with client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant_id,
        event_handler=EventHandler()
    ) as stream:
        stream.until_done()

    return StreamingResponse(EventHandler.generator_function(), media_type="text/plain")

I have created generator_function inside EventHandler class but problem is until stream is not over the execution doesn't reach return statement.

I have also tried websockets, but still problem is how should my program execution should flow. The stream doesn't let execution go further until API response is completed.


Solution

  • I found the solution!

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    from openai import OpenAI, AsyncOpenAI
    
    
    OPEN_AI_API_KEY = 'you_api_key'
    async_client = AsyncOpenAI(api_key=OPEN_AI_API_KEY)
    client = OpenAI(api_key=OPEN_AI_API_KEY)
    
    app = FastAPI()
    
    async def stream_assistant_response(assistant_id, thread_id):
        stream =  async_client.beta.threads.runs.stream(
            assistant_id=assistant_id,
            thread_id=thread_id
        )
    
        async with stream as stream:
            async for text in stream.text_deltas:
                yield f"data: {text}\n\n"
    
    
    @app.get("/message")
    async def add_message(assistant_id, thread_id, message):
        # make sure thread exist
        client.beta.threads.messages.create(
            thread_id=thread_id,
            role="user",
            content=message
        )
    
        return StreamingResponse(stream_assistant_response(assistant_id, thread_id), media_type="text/event-stream")