node.jsasync-awaitnodejs-stream

nodejs async await inside createReadStream


I am reading a CSV file line by line and inserting/updating in MongoDB. The expected output will be 1. console.log(row); 2. console.log(cursor); 3.console.log("stream");

But getting output like 1. console.log(row); console.log(row); console.log(row); console.log(row); console.log(row); ............ ............ 2. console.log(cursor); 3.console.log("stream"); Please let me know what i am missing here.

const csv = require('csv-parser');
const fs = require('fs');

var mongodb = require("mongodb");

var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {

  var db = client.db("UKCompanies");
  collection = db.collection("company");
  startRead();
});
var cursor={};

async function insertRec(row){
  console.log(row);
  cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
  if(cursor){
    console.log(cursor);
  }else{
    console.log('not exist')
  }
  console.log("stream");
}



async function startRead() {
  fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      await insertRec(row);
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}

Solution

  • In your startRead() function, the await insertRec() does not stop more data events from flowing while the insertRec() is processing. So, if you don't want the next data event to run until the insertRec() is done, you need to pause, then resume the stream.

    async function startRead() {
      const stream = fs.createReadStream('./data/inside/6.csv')
        .pipe(csv())
        .on('data', async (row) => {
          try {
            stream.pause();
            await insertRec(row);
          } finally {
            stream.resume();
          }
        })
        .on('end', () => {
          console.log('CSV file successfully processed');
        });
    }
    

    FYI, you also need some error handling if insertRec() fails.