vue.jsnuxt.jsopenai-apipinianitro

How Do I Pass Streaming Data from Nitro API to Pinia Store so I can use it in Nuxt Component?


In my Nuxt app I have a Pinia store and am making an API call via the Nitro server that gets streaming data in response. When I try to update the Pinia store with the streaming data it does not update, i.e. I can call

const streamOutput = ref({ value: "", annotations: [] })
const setStreamOutput = (output) => {
    console.log("setStreamOutput", output)
    streamOutput.value = output
    console.log("setStreamOutput", output)
}

and will see the console log but in my server logs, not in Browser and in my template and in the browser tool I still see the initial value for streamOutput.

export default defineEventHandler(async (event) => {
    const config = useRuntimeConfig()
    const body = await readBody(event)
    const chatStore = useChatStore()
    const readable = new Readable({
        read() {} // No-op implementation for read method
    })

    const run = sendStream(
        event,
        await openai.beta.threads
            .createAndRunStream({
                assistant_id: *ID*,
                thread: {
                    messages: [
                        { role: "user", content: "Explain deep learning to a 5 year old." }
                    ]
                }
            })
            .on("messageCreated", async (text) => {
                console.log("\n\n messageCreated", text)
            })
            .on("textDelta", async (textDelta, snapshot) => {
                console.log("\n\n textDelta", snapshot)
                await chatStore.setStreamOutput(snapshot)
            })
            .on("toolCallCreated", async (toolCall) => {
                console.log("\n\n toolCallCreated", toolCall)
            })
            .on("toolCallDelta", async (toolCallDelta, snapshot) => {
                console.log("\n\n toolCallDelta", snapshot)
                if (toolCallDelta.type === "code_interpreter") {
                    if (toolCallDelta.code_interpreter.input) {
                        chatStore.setStreamOutput(
                            chatStore.streamOutput + toolCallDelta.code_interpreter.input
                        )
                    }
                    if (toolCallDelta.code_interpreter.outputs) {
                        chatStore.setStreamOutput(chatStore.streamOutput + "\noutput >\n")
                        toolCallDelta.code_interpreter.outputs.forEach((output) => {
                            if (output.type === "logs") {
                                chatStore.setStreamOutput(
                                    chatStore.streamOutput + `\n${output.logs}\n`
                                )
                            }
                        })
                    }
                }
            })
            .on("textDone", async (content, snapshot) => {
                console.log("\n\n text Done")
            })
    )

    return sendStream(event, readable)
})

Solution

  • For anyone running into this the solution has two parts:

    1. Creating and returning a Readable
    2. Updating your Pinia store to read the data stream

    For creating a Readable

    import { Readable } from "stream"
    import { sendStream } from "h3"
    
    const readable = new Readable({
            read() {} // No-op implementation for read method
        })
    await openai.beta.threads
            .createAndRunStream({
                ...
            })
            .on("messageCreated", async (text) => {
                readable.push(`{"messageCreated": ${JSON.stringify(text)}}\n`)
            })
    //Handle other events similarly
    ...
    return sendStream(event, readable)
    })
    

    For updating your Pinia store to read the data stream (This is a bit tedious and I'm sure it can be optimized but I will forget about this answer in a bit and figure I should post before that)

    const streamOutput = ref({ value: "", annotations: [] })
    
    const setStreamOutput = (output) => {
        streamOutput.value = output
    }
    
    const createRun = async () => {
        try {
            const response = await fetch("/api/run/create", {
                method: "POST",
                body: JSON.stringify({
                    threadId: thread.value.id
                })
            })
    
            if (!response.ok) {
                throw new Error("Network response was not ok")
            }
    
            const reader = response.body.getReader()
            const decoder = new TextDecoder("utf-8")
    
            let buffer = ""
    
            let done = false
            while (!done) {
                const { value, done: readerDone } = await reader.read()
                done = readerDone
                if (value) {
                    buffer += decoder.decode(value, { stream: true })
    
                    // Process complete JSON objects in the buffer
                    let boundary
                    while ((boundary = buffer.indexOf("\n")) !== -1) {
                        const chunk = buffer.slice(0, boundary).trim()
                        buffer = buffer.slice(boundary + 1)
    
                        if (chunk) {
                            try {
                                const json = JSON.parse(chunk)
                                if (json.textDelta) {
                                    // VVVVV Here you can hook into your store's function to use the data
                                    setStreamOutput(json.textDelta.value)
                                } else if (json.messageCreated) {
                                    ... // Handle other streaming events