pythonflaskopenai-apilangchain

How can I stream a response from LangChain's OpenAI using Flask API?


I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a flag streaming=True.

The problem is, that I can't "forward" the stream or "show" the strem than in my API call.

Code for the processing OpenAI and chain is:

def askQuestion(self, collection_id, question):
    collection_name = "collection-" + str(collection_id)
    self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
    self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,  output_key='answer')
    
    chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)


    self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
                                                        return_source_documents=True,verbose=VERBOSE, 
                                                        memory=self.memory)
    

    result = self.chain({"question": question})
    
    res_dict = {
        "answer": result["answer"],
    }

    res_dict["source_documents"] = []

    for source in result["source_documents"]:
        res_dict["source_documents"].append({
            "page_content": source.page_content,
            "metadata":  source.metadata
        })

    return res_dict

and the API route code:

@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
    question = request.form["question"]
    # response_generator = document_thread.askQuestion(collection_id, question)
    # return jsonify(response_generator)

    def stream(question):
        completion = document_thread.askQuestion(collection_id, question)
        for line in completion['answer']:
            yield line

    return app.response_class(stream_with_context(stream(question)))

I am testing my endpoint with curl and I am passing flag -N to curl, so I should get the streamable response, if it is possible.

When I make API call first the endpoint is waiting to process the data (I can see in my terminal in VS code the streamable answer) and when finished, I get everything displayed in one go.


Solution

  • With the usage of threading and callback we can have a streaming response from flask API.

    In flask API, you may create a queue to register tokens through langchain's callback.

    class StreamingHandler(BaseCallbackHandler):
        ...
    
        def on_llm_new_token(self, token: str, **kwargs) -> None:
            self.queue.put(token)
    

    You may get tokens from the same queue in your flask route.

    from flask import Response, stream_with_context
    import threading 
    
    @app.route(....):
    def stream_output():
       q = Queue()
       
       def generate(rq: Queue):
          ...
          # add your logic to prevent while loop
          # to run indefinitely  
          while( ...):
              yield rq.get()
       
       callback_fn = StreamingHandler(q)
       
       threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
       return Response(stream_with_context(generate(q))
    
    

    In your langchain's ChatOpenAI add the above custom callback StreamingHandler.

    self.llm = ChatOpenAI(
      model_name=self.model_name, 
      temperature=self.temperature, 
      openai_api_key=os.environ.get('OPENAI_API_KEY'), 
      streaming=True, 
      callback=[callback_fn,]
    )
    

    For reference: