I read a gz file using sax and zlib and then update my db, its seems that i skipped some of my file content.
This is my code:
// variables to demonstrate the weired behovior
let recordsNum = 1;
let recordsNum2 = 1;
let numOfFailed = 0;
async function boo(...args){
const saxStream = sax.createStream(true); // strict mode
let currentElement = null;
let currentObject = null;
saxStream.on("opentag", (node) => {
if (node.name === "Item") {
currentObject = {};
} else {
currentElement = keyTransforms[node.name] ;
}
});
saxStream.on("closetag", async (nodeName) => {
if (nodeName === "Item") {
// process the current object
recordsNum++;
if (arrOfRecoreds.length <= 100 && currentObject) {
arrOfRecoreds.push(currentObject);
recordsNum2++;
}
if (!currentObject) {
numOfFailed++;
}
if (arrOfRecoreds.length === 100) {
await insertBatch(arrOfRecoreds);
arrOfRecoreds = [];
}
// reset the current object
currentObject = null;
} else {
currentElement = null;
}
});
}
const readStream = fs
.createReadStream("./temp/" + fileName)
.pipe(zlib.createGunzip())
.pipe(saxStream);
await new Promise((resolve, reject) => {
readStream.on("end", async () => {
if (arrOfRecoreds.length > 0) {
await insertBatch(arrOfRecoreds);
resolve();
}
});
readStream.on("error", reject);
});
return new Promise((res) => {
pool.end();
console.log("Finished processing file.");
res({
statusCode: 201,
message: "update db",
});
});
where insertBatch function is just updateing db according the recoreds:
const client = await pool.connect();
try {
await client.query("BEGIN");
await doSomthing(recoreds)
await client.query("COMMIT");
}
catch(e){
await client.query("ROLLBACK");
}finally{
client.release()
}
The code works without errors but it contain some bugs i cant understand :
first bug: when invoking the boo:
boo("str", "filename.rar").then(() => {
console.log("num of recordes ", recordsNum);
console.log("num of recordes2 ", recordsNum2);
console.log("num of failed ", numOfFailed);
});
the logs are: num of records 100 num of recordes2 45 num of failed 6
these bugs make this script problematic due to it not updating all the records in my postgress db
You have not declared arrOfRecoreds
in your code.
Your closetag
handler is asynchronous, but sax
does not wait for asynchronous handlers to finish. This means that after you issue the database operation command insertBatch(arrOfRecoreds)
, sax
will continue parsing. Another closetag
event will be emitted and new entries pushed into arrOfRecoreds
before the database operation finishes and arrOfRecoreds = []
is executed. At that point in time, everything that was pushed into it while the database operation was ongoing is lost.
This can be avoided if you don't await the database operation and reset arrOfRecoreds
immediately after triggering it:
if (arrOfRecoreds.length === 100) {
dbOperations.push(insertBatch(arrOfRecoreds));
arrOfRecoreds = [];
}
Here, dbOperations
is an array that you declare as var dbOperations = [];
in the same place where you declare arrOfRecoreds
. It contains one promise per insertBatch
operation, and you must wait for all these promises to become fulfilled before you can close your database connection at the end of the loop:
await Promise.all(dbOperations);
pool.end();