node.jssequelize.jsnode-streamsthrough2

Make sure promise resolved inside transformFunction


I am studying through2 and sequelize.

My codes:

  return Doc.createReadStream({
    where: { /*...*/ },
    include: [
      {
        /*...*/
      },
    ],
  })
  .pipe(through({ objectMode: true }, (doc, enc, cb) => {
    Comment.findOne(null, { where: { onId: doc.id } }).then((com) => { /* sequelize: findOne*/
      com.destroy(); /* sequelize instance destroy: http://docs.sequelizejs.com/manual/tutorial/instances.html#destroying-deleting-persistent-instances */
      cb();
    });
  }))
  .on('finish', () => {
    console.log('FINISHED');
  })
  .on('error', err => console.log('ERR', err));

I am trying to express my question clearly. Doc and Comment are sequelize Models. I want to use stream to read Doc instances from database one by one and delete comments on each Doc instance. Comment.findOne and com.destroy() will both return promises. I want to the promises resolved for each doc and then call cb(). But my above codes cannot work, before com be destroyed, the codes already finish running.

How to fix it? Thanks

I wrap the above piece of codes in mocha test, like

it('should be found by readstream', function _testStream(){
  /* wrap the first piece of codes here*/
});

But before stream finished reading, the test exist.


Solution

  • You can wait for another promise by returning the promise and using another .then.

    You may need to check for the com result being null as well, before running .destroy().

      .pipe(through({ objectMode: true }, (doc, enc, cb) => {
        Comment.findOne(null, { where: { onId: doc.id } })
          .then(com => com.destroy())
          .then(()=> cb())
          .catch(cb)
      }))
    

    Then when running the test in mocha, you need to wait for the asynchronous stream by adding done to the test function signature and calling done() on completion or error.

    it('should be found by readstream', function _testStream(done){
      ...
      .on('finish', () => done())
      .on('error', done)
    })