I have setup FastAPI with Llama.cpp and Langchain. Now I want to enable streaming in the FastAPI responses. Streaming works with Llama.cpp in my terminal, but I wasn't able to implement it with a FastAPI response.
Most tutorials focused on enabling streaming with an OpenAI model, but I am using a local LLM (quantized Mistral) with llama.cpp. I think I have to modify the Callbackhandler, but no tutorial worked. Here is my code:
from fastapi import FastAPI, Request, Response
from langchain_community.llms import LlamaCpp
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import copy
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
model_path = "../modelle/mixtral-8x7b-instruct-v0.1.Q5_K_M.gguf"
prompt= """
<s> [INST] Im folgenden bekommst du eine Aufgabe. Erledige diese anhand des User Inputs.
### Hier die Aufgabe: ###
{typescript_string}
### Hier der User Input: ###
{input}
Antwort: [/INST]
"""
def model_response_prompt():
return PromptTemplate(template=prompt, input_variables=['input', 'typescript_string'])
def build_llm(model_path, callback=None):
callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])
#callback_manager = CallbackManager(callback)
n_gpu_layers = 1 # Metal set to 1 is enough. # ausprobiert mit mehreren
n_batch = 512#1024 # Should be between 1 and n_ctx, consider the amount of RAM of your Apple Silicon Chip.
llm = LlamaCpp(
max_tokens =1000,
n_threads = 6,
model_path=model_path,
temperature= 0.8,
f16_kv=True,
n_ctx=28000,
n_gpu_layers=n_gpu_layers,
n_batch=n_batch,
callback_manager=callback_manager,
verbose=True,
top_p=0.75,
top_k=40,
repeat_penalty = 1.1,
streaming=True,
model_kwargs={
'mirostat': 2,
},
)
return llm
# caching LLM
@lru_cache(maxsize=100)
def get_cached_llm():
chat = build_llm(model_path)
return chat
chat = get_cached_llm()
app = FastAPI(
title="Inference API for Mistral and Mixtral",
description="A simple API that use Mistral or Mixtral",
version="1.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def bullet_point_model():
llm = build_llm(model_path=model_path)
llm_chain = LLMChain(
llm=llm,
prompt=model_response_prompt(),
verbose=True,
)
return llm_chain
@app.get('/model_response')
async def model(question : str, prompt: str):
model = bullet_point_model()
res = model({"typescript_string": prompt, "input": question})
result = copy.deepcopy(res)
return result
In a example notebook, I am calling FastAPI like this:
import subprocess
import urllib.parse
import shlex
query = input("Insert your bullet points here: ")
task = input("Insert the task here: ")
#Safe Encode url string
encodedquery = urllib.parse.quote(query)
encodedtask = urllib.parse.quote(task)
#Join the curl command textx
command = f"curl -X 'GET' 'http://127.0.0.1:8000/model_response?question={encodedquery}&prompt={encodedtask}' -H 'accept: application/json'"
print(command)
args = shlex.split(command)
process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
print(stdout)
So with this code, getting responses from the API works. But I only see streaming in my terminal (I think this is because of the StreamingStdOutCallbackHandler
. After the streaming in the terminal is complete, I am getting my FastAPI response.
What do I have to change now that I can stream token by token with FastAPI and a local llama.cpp model?
I was doing the same and hit similar issue that FastAPI was not streaming the response even I am using the StreamingResponse
API and eventually I got the following code work. There are three important part:
Make sure using StreamingResponse
to wrap an Iterator
.
Make sure the Iterator sends newline character \n
in each streaming response.
Make sure using streaming APIs to connect to your LLMs. For example, _client.chat
function in my example is using httpx
to connect to REST APIs for LLMs. If you use requests
package, it won't work as it doesn't support streaming.
async def chat(self, request: Request):
"""
Generate a chat response using the requested model.
"""
# Passing request body JSON to parameters of function _chat
# Request body follows ollama API's chat request format for now.
params = await request.json()
self.logger.debug("Request data: %s", params)
chat_response = self._client.chat(**params)
# Always return as streaming
if isinstance(chat_response, Iterator):
def generate_response():
for response in chat_response:
yield json.dumps(response) + "\n"
return StreamingResponse(generate_response(), media_type="application/x-ndjson")
elif chat_response is not None:
return json.dumps(chat_response)
You can find detailed explanation here: