javascriptcsvasync-awaitpromisefs

Parsing large CSV and streaming rows of promises


getting a bit mixed up in trying to stream a csv, make http requests for each row, and have everything execute and log to console in the "proper" order. Ultimately, I think I'm not wrapping my promises right, or...?

const getUserByEmail = async (email) => {
  const encodedEmail = encodeURIComponent(email);

  try {
    const response = await http.get(`users?email=${encodedEmail}`);
    const userId = response.data.data[0] && response.data.data[0].id;

    return (userId ? userId : `${email} not found`);
  } catch (error) {
    console.error('get user error: ', error);
  }
};

const run = async () => {
  console.log('==> Reading csv ...');

  const promises = [];
  const readStream = fs.createReadStream('import-test.csv')
    .pipe(csv.parse({ headers: true }))
    .on('error', (error) => console.error('stream error: ', error))
    .on('data', (row) => {
      promises.push(getUserByEmail(row.email));
    })
    .on('end', rowCount => {
      console.log(`==> Parsed ${rowCount} rows from csv ...`);
    })

  await Promise.all(promises)
    .then(values => console.log(values))

  console.log('==> End of script')
};

run();

I'm attempting / expecting the code above to take each row of the csv, push each http call (a promise) to an array of promises, and have everything execute/log to console in the order I'm expecting.

This is my actual output:

==> Reading csv...
[]
==> End of script
==> Parsed 10 rows from csv ...

And this is what I'm expecting:

==> Reading csv...
==> Parsed 10 rows from csv ...
[
  QyDPkn3WZp,
  e75KzrqYxK,
  iqDXoEFMZy,
  PstouMRz3y,
  w188hLyeT6,
  g18oxMOy6l,
  8wjVJutFnh,
  fakeEmail@fakeDomain.com not found,
  QEHaG3cp7d,
  y8I4oX6aCe
]
==> End of script

The biggest issue for me is that anything is logging after "==> End of script", which indicates to me that I don't have a strong grasp of when/why all previous events are logging in the order that they are.

Ultimately—and I haven't gotten there yet—I'd like to also buffer/time these requests to 100 per minute otherwise I will be rate-limited by this particular API.

Thank you!


Solution

  • The whole readStream all the way down to await Promise.all(promises) is synchronous - the data event is asynchronous and populates promises in another event loop therefore promises is an empty array when you call Promise.all - you are not waiting for the stream to end. You might want to put your logic in the end event instead like this

    const run = async () => {
      console.log('==> Reading csv ...');
    
      const promises = [];
      const readStream = fs.createReadStream('import-test.csv')
        .pipe(csv.parse({ headers: true }))
        .on('error', (error) => console.error('stream error: ', error))
        .on('data', (row) => {
          promises.push(getUserByEmail(row.email));
        })
        .on('end', async rowCount => {
          await Promise.all(promises)
            .then(values => console.log(values))
    
          console.log('==> End of script')
        })
    }
    

    Another easier way to go about it is to use async iterator the readStream has a symbol.asyncIterator that you can use

    const run = async () => {
      console.log('==> Reading csv ...');
    
      let rowCount = 0
      const promises = []
      const readStream = fs.createReadStream('import-test.csv')
        .pipe(csv.parse({ headers: true }))
      
      for await (let row of readStream) {
        rowCount++
        promises.push(getUserByEmail(row.email));
      }
        
      console.log(`==> Parsed ${rowCount} rows from csv ...`)
    
      await Promise.all(promises).then(console.log)
    
      console.log('==> End of script')
    }
    

    I would have gone further to limit the concurrency and do:

    const run = async () => {
      console.log('==> Reading csv ...');
    
      const result = []
      const readStream = fs.createReadStream('import-test.csv')
        .pipe(csv.parse({ headers: true }))
      
      for await (let row of readStream) {
        result.push(await getUserByEmail(row.email))
      }
    
      console.log(result)
      console.log('==> End of script')
    }
    

    if you want to increase the concurency of a async iterator, then look at this post but beware. the result could be out of order when using this method