im trying to process a few hundreds of json.gz files using worker threads. at some point im getting js heap limit error due to 3 big files(about 3gb each unzipped).
i tried to find a way to process the objects of each file one by one, but all i've managed to get is all of the file's objects at once.
here is the worker code at the moment:
for (let gzFile of zippedFiles) {
const gunzip = zlib.createGunzip()
const parser = JSONStream.parse('offers.*')
const readStream = fs.createReadStream(gzFile)
readStream.pipe(gunzip).pipe(parser)
.pipe(es.map((offers, callback) => { //offers contains all of the current file objects array
offers.forEach(rawProduct => {
let processedProduct = getProcessedProduct(rawProduct)
parentPort.postMessage({ processedProduct })
})
callback()
})
.on('error', (e) => {
console.trace(`Error while reading file`, e)
})
.on('end', () => {
idxCount++
if (idxCount === lastIdx) {
parentPort.postMessage({ completed: true })
}
})
)
}
jsons structure:
{
"offers":
{
"offer":
[
{}, // => the objects i wanna get one by one
{},
{}
]
}
}
how can i avoid getting js heap limit error? thanks!
Nidhim David suggestion is exactly what I was looking for.
here is the working code:
for (let gzFile of zippedFiles) {
const pipeline = chain([
fs.createReadStream(gzFile),
zlib.createGunzip(),
parser(),
pick({ filter: 'offers.offer' }), //getting the array of objects
streamArray(),
]);
pipeline.on('data', ({key, value}) => {
//getting objects one by one and processing them
const rawProduct = value;
const processedProduct = getProcessedProduct(rawProduct);
parentPort.postMessage({ processedProduct });
})
pipeline.on('end', () => {
idxCount++;
if (idxCount === lastIdx) {
debug(`last zipped file, sending complete message`);
parentPort.postMessage({ completed: true });
}
});
}