node.jsnode-streams

nodejs pipeline streaming asynchronously


I'm fairly new to nodejs streams so please bear with me.

I am using the node stream pipeline method to make copies of a stream (specifically a node fetched image that is a readable stream). When I use the synchronous pipeline api, it works just fine (as in, I see the pipeline success message console logging), but I need to make this process asynchronous because I am processing (making copies of) more than one image at a time.

When I wrap the synchronous method in util.promisify, it just hangs forever -- the promise is in pending state forever it seems (doesn't even throw an error that I can see which is the most frustrating part). Am I missing something here? Open to suggestions for solutions or even for insight into how I can see what the problem is since I don't even get back an error message that I can try to debug

Here is the code / what I've tried:

import util from 'util';
import { pipeline, PassThrough } from 'stream';

// synchronous 
// confirmed that this works
// the images get copied into the streams array, I get a bunch of 'Pipeline succeeded.' console logs
const streams = allKeys.map(() => pipeline(response.body, new PassThrough(), 
  (err) => {
    if (err) {
       console.error('Pipeline failed.', err);
     } else {
       console.log('Pipeline succeeded.');
      }
    }
  )
);

// asynchronous
// this does not work, I never get any console logs and the promise just hangs forever
const pipelineAsync = util.promisify(pipeline);
  const streams = await Promise.all(allKeys.map(async () => {
    try {
      const stream = await pipelineAsync(
        response.body,
        new PassThrough()
       )
      console.log('Pipeline succeeded.');
      return stream;
    } catch(e) {
      console.log('error!', e)
      return null;
    }
  }
));

also, in case this is helpful, some notes here:

  1. I am using PassThrough here because these streams have to be readable/uploaded to S3 as well (not included in code snippet above)

  2. Also I'm on node 16, so I have also tried using the pipeline from stream/promises and I get the same hanging result

  3. I also tried to use the synchronous pipeline api and wrap it manually in a promise like such:

import { pipeline } from 'stream';

const streams = await Promise.all(allKeys.map(async () => {
  console.log('inside the map loop!')
  return new Promise((resolve, reject) => {
    console.log('we are inside the promise!');
    return pipeline(
      response.body,
      new PassThrough(),
      (err) => {
         if (err) {
           console.error('Pipeline failed.', err);
           return reject();
         } else {
          console.log('Pipeline succeeded.');
          return resolve(undefined);
         }
       }
     )
   })
 }
));

and I still get the same hanging behavior (I get 'inside the map loop' and 'we are inside the promise' console logs but never get to the pipeline failed or succeeded console logs.


Solution

  • To "pipe" one readable stream into multiple writable streams, execute multiple write commands in the data event handler of the readable stream and similar for the end event:

    var blocked = 0;
    function resume() {
      if (--blocked === 0) readable.resume();
    }
    writable1.on("drain", resume);
    writable2.on("drain", resume);
    readable.on("data", function(chunk) {
      if (!writable1.write(chunk)) blocked++;
      if (!writable2.write(chunk)) blocked++;
      if (blocked > 0) readable.pause();
    })
    .on("end", function() {
      writable1.end();
      writable2.end();
    });
    

    Since the data event handler is synchronous but writing is an asynchronous operation, the event handler might continue writing chunks of data even if a writable stream is still busy consuming the previous chunk(s) so that its buffer overflows. In order to avoid that, the readable stream is paused if one writable stream's write method returns false (meaning it wants to block further chunks of data), and it is resumed only after all blocked writable streams are unblocked again and have emitted the drain event.

    Error handling must also be considered. If one writable stream fails, should the others continue? In any case, if the readable stream fails, all writable streams are destroyed:

    readable.on("error", function(err) {
      writable1.destroy(err);
      writable2.destroy(err);
    });