In my Nuxt app I have a Pinia store and am making an API call via the Nitro server that gets streaming data in response. When I try to update the Pinia store with the streaming data it does not update, i.e. I can call
const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
console.log("setStreamOutput", output)
streamOutput.value = output
console.log("setStreamOutput", output)
}
and will see the console log but in my server logs, not in Browser and in my template and in the browser tool I still see the initial value for streamOutput.
export default defineEventHandler(async (event) => {
const config = useRuntimeConfig()
const body = await readBody(event)
const chatStore = useChatStore()
const readable = new Readable({
read() {} // No-op implementation for read method
})
const run = sendStream(
event,
await openai.beta.threads
.createAndRunStream({
assistant_id: *ID*,
thread: {
messages: [
{ role: "user", content: "Explain deep learning to a 5 year old." }
]
}
})
.on("messageCreated", async (text) => {
console.log("\n\n messageCreated", text)
})
.on("textDelta", async (textDelta, snapshot) => {
console.log("\n\n textDelta", snapshot)
await chatStore.setStreamOutput(snapshot)
})
.on("toolCallCreated", async (toolCall) => {
console.log("\n\n toolCallCreated", toolCall)
})
.on("toolCallDelta", async (toolCallDelta, snapshot) => {
console.log("\n\n toolCallDelta", snapshot)
if (toolCallDelta.type === "code_interpreter") {
if (toolCallDelta.code_interpreter.input) {
chatStore.setStreamOutput(
chatStore.streamOutput + toolCallDelta.code_interpreter.input
)
}
if (toolCallDelta.code_interpreter.outputs) {
chatStore.setStreamOutput(chatStore.streamOutput + "\noutput >\n")
toolCallDelta.code_interpreter.outputs.forEach((output) => {
if (output.type === "logs") {
chatStore.setStreamOutput(
chatStore.streamOutput + `\n${output.logs}\n`
)
}
})
}
}
})
.on("textDone", async (content, snapshot) => {
console.log("\n\n text Done")
})
)
return sendStream(event, readable)
})
For anyone running into this the solution has two parts:
For creating a Readable
import { Readable } from "stream"
import { sendStream } from "h3"
const readable = new Readable({
read() {} // No-op implementation for read method
})
await openai.beta.threads
.createAndRunStream({
...
})
.on("messageCreated", async (text) => {
readable.push(`{"messageCreated": ${JSON.stringify(text)}}\n`)
})
//Handle other events similarly
...
return sendStream(event, readable)
})
For updating your Pinia store to read the data stream (This is a bit tedious and I'm sure it can be optimized but I will forget about this answer in a bit and figure I should post before that)
const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
streamOutput.value = output
}
const createRun = async () => {
try {
const response = await fetch("/api/run/create", {
method: "POST",
body: JSON.stringify({
threadId: thread.value.id
})
})
if (!response.ok) {
throw new Error("Network response was not ok")
}
const reader = response.body.getReader()
const decoder = new TextDecoder("utf-8")
let buffer = ""
let done = false
while (!done) {
const { value, done: readerDone } = await reader.read()
done = readerDone
if (value) {
buffer += decoder.decode(value, { stream: true })
// Process complete JSON objects in the buffer
let boundary
while ((boundary = buffer.indexOf("\n")) !== -1) {
const chunk = buffer.slice(0, boundary).trim()
buffer = buffer.slice(boundary + 1)
if (chunk) {
try {
const json = JSON.parse(chunk)
if (json.textDelta) {
// VVVVV Here you can hook into your store's function to use the data
setStreamOutput(json.textDelta.value)
} else if (json.messageCreated) {
... // Handle other streaming events