javascriptnode.jsnext.jsfetch-api

How to convert ReadStream into ReadableStream in NodeJS?


This question is the exact reverse of converting a ReadableStream into a ReadStream.

With the advent of non-Node.js runtimes such as Deno or the "Edge runtime" in Next.js, it can be useful to convert a Node.js specific ReadStream into a generic ReadableStream.

This is for instance useful to send files from a Next.js route handler, see this discussion on Next.js GitHub.

I've drafted a piece of code like so:

const downloadStream = fs.createReadStream(zipFilePath);
    const readStream = new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return downloadStream.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream
            if (done) {
              controller.close();
              return;
            }
            // Enqueue the next data chunk into our target stream
            controller.enqueue(value);
            return pump();
          });
        }
      },
    });

I am in the process of testing it.

Edit: the problem with this first draft is that stream.Readable read() method doesn't return a promise, as mentioned by @Mahesh in the comments.

Here is a second try:

    const downloadStream = fs.createReadStream(zipFilePath);
    const readStream = new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          const buf = downloadStream.read() as Buffer
          if (buf === null) {
            controller.close();
            return;
          }
          controller.enqueue(buf.toString());
          return pump();
        }
      },
    });

It gives me a null buffer immediately despite the file weighing 344 bytes. When I call isPaused(), the stream doesn't seem to be paused. Calling pause() doesn't fix my issue, neither adding an explicit size of 1 byte to read().

I also get a weird error from Next.js:

- error Error: aborted
    at connResetException (node:internal/errors:711:14)
    at Socket.socketCloseListener (node:_http_client:454:19)
    at Socket.emit (node:events:525:35)
    at TCP.<anonymous> (node:net:313:12) {
  code: 'ECONNRESET'
}

Are there simpler solutions, syntax-wise?


Solution

  • Managed to find a working syntax but still lacking some details.

    1. We want to be able to read a file using an imperative syntax, instead of relying on the traditional "data" event.
    /**
     * From https://github.com/MattMorgis/async-stream-generator
     */
    async function* nodeStreamToIterator(stream) {
        for await (const chunk of stream) {
            yield chunk;
        }
    }
    

    I am not familiar with generators so I am not sure of:

    But at least this syntax let's us consume the stream in loops.

    1. Now we want to convert the iterator into a web platform ReadStream.
    /**
     * Taken from Next.js doc
     * https://nextjs.org/docs/app/building-your-application/routing/router-handlers#streaming
     * Itself taken from mozilla doc
     * https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#convert_async_iterator_to_stream
     * @param {*} iterator 
     * @returns {ReadableStream}
     */
    function iteratorToStream(iterator) {
        return new ReadableStream({
            async pull(controller) {
                const { value, done } = await iterator.next()
    
                if (done) {
                    controller.close()
                } else {
    
                    controller.enqueue(new Uint8Array(value))
                }
            },
        })
    }
    

    Notice the "Uint8Array": this doesn't seem to be needed in all scenarios, but encoding may be required on some platforms, I needed this conversion in Next.js. See this discussion on Next.js github.

    Finally we can use this stream in a Response just to see how it works:

    // highWaterMark affects the chunk size, here I use a small size to simulate many chunks
    const nodeStream = fs.createReadStream("./.gitignore", { highWaterMark: 8 })
    const iterator = nodeStreamToIterator(nodeStream)
    const webStream = iteratorToStream(iterator)
    
    const res = new Response(webStream)
    const blob = await res.blob()
    console.log(await blob.text())