javascriptnode.jsasync-awaitstreamnode.js-fs

How to process each row/buffer synchronously when reading a file asynchronously using a stream


as you can see i have a js that takes a .csv and calls an async function for every row (4 different functions iteratively).

The problem is that I need to wait the end of the function in the i-th iteration before I proceed to the i+1 iteration.

const csv = require('csv-parser');
const fs = require('fs');

var i=1;

fs.createReadStream('table.csv')
  .pipe(csv())
  .on('data', (row) => {
      switch(i%4){
          case 1: org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 2: org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 3: org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 0: org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
      }
    i++;
  })
  .on('end', () => {
    console.log('CSV file successfully processed');
  });

  async function org1createPatient(patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
    ...
  }

  async function org2createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
    ...
  }

  async function org3createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
   ...
  }

  async function org4createPatient( patientId, FirstName, LastName, Age, Sex, ChestPainType, RestingBP, Cholesterol, FastingBS, RestingECG, MaxHR, ExerciseAngina, Oldpeak, ST_Slope, HeartDisease) {
   ...
  }

How can I get what I want? Hope my question is clear enough!


Solution

  • The readStream you are using here is asynchronous, meaning .on(event, callback) will trigger every time a new piece of data is read, independently of any callback triggered. In other words, the execution of the callback function here does not impact this process, it will be ran in parallel, every time event received.

    This means that in case callback was to execute a piece of code that is asynchronous, you may very well end up in a situation where multiple instances of this function may still be running by the time the next read event is received.

    Note: this holds true for any event, including the 'end' event.

    If you were to use async/await on callback if would only make the internal logic of this function synchronous. It would still not impact the rate at which your data is read.

    In order to do so you will want to use both async/await on callback (to make it internally synchronous) and have callback manually pause and resume the read operation happening in parallel.

    const csv = require('csv-parser');
    const fs = require('fs');
    
    let i = 1;
    
    const stream = fs.createReadStream('table.csv').pipe(csv());
    
    stream.on('data', async (row) => {
       // pause overall stream until this row is processed
       stream.pause();
    
       // process row
       switch (i%4){
          case 1: await org1createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 2: await org2createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 3: await org3createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
          case 0: await org4createPatient(row.patientId, row.FirstName, row.LastName, row.Age, row.Sex, row.ChestPainType, row.RestingBP, row.Cholesterol, row.FastingBS, row.RestingECG, row.MaxHR, row.ExerciseAngina, row.Oldpeak, row.ST_Slope, row.HeartDisease); break;
       }
       i++;
    
       // resume overall stream
       stream.resume();
    });
    
    stream.on('end', () => {
      // now guaranteed that no instances of `callback` is still running in parallel when this event is fired
      console.log('CSV file successfully processed');
    });