This question is the exact reverse of converting a ReadableStream into a ReadStream.
ReadStream
is a structure used in Node.jsReadableStream
is a structure brought by the web platformWith the advent of non-Node.js runtimes such as Deno or the "Edge runtime" in Next.js, it can be useful to convert a Node.js specific ReadStream
into a generic ReadableStream
.
This is for instance useful to send files from a Next.js route handler, see this discussion on Next.js GitHub.
I've drafted a piece of code like so:
const downloadStream = fs.createReadStream(zipFilePath);
const readStream = new ReadableStream({
start(controller) {
return pump();
function pump() {
return downloadStream.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
});
I am in the process of testing it.
Edit: the problem with this first draft is that stream.Readable
read()
method doesn't return a promise, as mentioned by @Mahesh in the comments.
Here is a second try:
const downloadStream = fs.createReadStream(zipFilePath);
const readStream = new ReadableStream({
start(controller) {
return pump();
function pump() {
const buf = downloadStream.read() as Buffer
if (buf === null) {
controller.close();
return;
}
controller.enqueue(buf.toString());
return pump();
}
},
});
It gives me a null buffer immediately despite the file weighing 344 bytes. When I call isPaused()
, the stream doesn't seem to be paused. Calling pause()
doesn't fix my issue, neither adding an explicit size of 1 byte to read()
.
I also get a weird error from Next.js:
- error Error: aborted
at connResetException (node:internal/errors:711:14)
at Socket.socketCloseListener (node:_http_client:454:19)
at Socket.emit (node:events:525:35)
at TCP.<anonymous> (node:net:313:12) {
code: 'ECONNRESET'
}
Are there simpler solutions, syntax-wise?
Managed to find a working syntax but still lacking some details.
/**
* From https://github.com/MattMorgis/async-stream-generator
*/
async function* nodeStreamToIterator(stream) {
for await (const chunk of stream) {
yield chunk;
}
}
I am not familiar with generators so I am not sure of:
ReadableStream
?for await
means here?But at least this syntax let's us consume the stream in loops.
ReadStream
./**
* Taken from Next.js doc
* https://nextjs.org/docs/app/building-your-application/routing/router-handlers#streaming
* Itself taken from mozilla doc
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream#convert_async_iterator_to_stream
* @param {*} iterator
* @returns {ReadableStream}
*/
function iteratorToStream(iterator) {
return new ReadableStream({
async pull(controller) {
const { value, done } = await iterator.next()
if (done) {
controller.close()
} else {
controller.enqueue(new Uint8Array(value))
}
},
})
}
Notice the "Uint8Array": this doesn't seem to be needed in all scenarios, but encoding may be required on some platforms, I needed this conversion in Next.js. See this discussion on Next.js github.
Finally we can use this stream in a Response
just to see how it works:
// highWaterMark affects the chunk size, here I use a small size to simulate many chunks
const nodeStream = fs.createReadStream("./.gitignore", { highWaterMark: 8 })
const iterator = nodeStreamToIterator(nodeStream)
const webStream = iteratorToStream(iterator)
const res = new Response(webStream)
const blob = await res.blob()
console.log(await blob.text())