I'm trying to read a lot of files and combine the information from them into one. Here is the implementation that I got. I want to rewrite it to .pipe(), but it only turns out to overwrite the file at each iteration. I'm asking for help.
async joinSlices(path) {
const wrs = new WriteStream('./bigJSON.json');
const files = fs.readdirSync(path);
for (let file of files) {
console.log(file + ' start');
await new Promise((res, rej) => {
const rs = new ReadStream(path + file, 'utf-8');
rs.on('data', (chunk) => { wrs.write(chunk); });
rs.on('end', (data) => { console.log(file + ' end'); });
rs.on('close', (data) => { res(); });
});
}
wrs.destroy();
console.log('write stream end');
}
The cycle starts if both streams are declared in its body and a promise is placed at the end of recording, but overwriting occurs at each iteration.
The Node.js-specific Stream API is a bit of a pain to work with. Luckily, the streams are also instances of the standardized Iteration_protocols (that are also implemented in browsers), which means you can use for await of
as follows.
import fs from 'node:fs'
const inDir = 'in/'
const outFile = fs.createWriteStream('./out.json')
outFile.write('[')
let first = true
for (const fileName of fs.readdirSync(inDir)) {
if (first) {
first = false
} else {
outFile.write(',')
}
const file = fs.createReadStream(inDir + fileName, { encoding: 'utf-8' })
for await (const str of file) {
outFile.write(str)
}
}
outFile.write(']')
outFile.close()
Or if you prefer a more functional style (with pipeline
):
import fs from 'node:fs'
import { pipeline } from 'node:stream/promises'
const inDir = 'in/'
const outFile = fs.createWriteStream('./out.json')
const iter =
concatIterable(
'[',
joinIterable(
flatMapIterable(fs.readdirSync(inDir), fileName =>
fs.createReadStream(inDir + fileName, { encoding: 'utf-8' })
),
','
),
']',
)
pipeline(iter, outFile).catch(console.error)
// Until https://github.com/tc39/proposal-async-iterator-helpers/ has landed,
// we define ourselves some helpers for Iterables, just like we have for Arrays.
async function * concatIterable (...iters) {
for (const iter of iters) {
yield * iter
}
}
async function * flatMapIterable (iter, callback) {
let i = 0
for await (const val of iter) {
yield * callback(val, i++)
}
}
async function * joinIterable (iter, sep) {
let first = true
for await (const value of iter) {
if (first) {
first = false
} else {
yield sep
}
yield value
}
}