node.jspromisenode-streams

NodeJS Streams behaviour pipeline vs promise


I'm implementing some code to take an image, transform it to two formats png and jpg using sharp library, and return both streams to be later uploaded to S3 bucket.

I've come with two different solutions, one using Promise, and other using stream.pipeline. However, for some reason the pipeline version is running much slower than the promise.

Here's the code to reproduce the behaviour (running with node 14)

const sharp = require('sharp')
const fs = require('fs')
const util = require('util')
const stream = require('stream')
const pipeline = util.promisify(stream.pipeline);

console.time('resize')

const resizeJobPipeline = async (readableStream) => {
  const sharpStream = sharp({
    failOnError: false
  }).resize({width: 800, height: 800, fit: 'inside'})

  // using PassThrough here because in the final code will have to pass this stream to s3 upload
  const memoryPng = new stream.PassThrough()
  const memoryJpg = new stream.PassThrough()

  // Have to await each pipeline separately,
  // if wrap them in a Promise.all, then the images don't get fully processed/become corrupted
  await pipeline(readableStream, sharpStream.clone().png(), memoryPng)
  await pipeline(readableStream, sharpStream.clone().jpeg(), memoryJpg)

  return [memoryPng, memoryJpg]
}


const resizeJobPromise = async (readableStream) => {
  const sharpStream = sharp({
    failOnError: false
  }).resize({width: 800, height: 800, fit: 'inside'})

  const promises = []
  promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
  promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
  readableStream.pipe(sharpStream)

  return await Promise.all(promises)
}

const readStream = fs.createReadStream('big_img.jpg')

// resizeJobPromise(readStream).then(res => {
//   res[0].pipe(fs.createWriteStream('resized.png'))
//   res[1].pipe(fs.createWriteStream('resized.jpg'))
//   console.timeEnd('resize')

// }).catch(err => {
//   console.log(err)
// })

resizeJobPipeline(readStream).then(res => {
  res[0].pipe(fs.createWriteStream('resized.png'))
  res[1].pipe(fs.createWriteStream('resized.jpg'))
  console.timeEnd('resize')

}).catch(err => {
  console.log(err)
})

If I run the resizeJobPipeline version, using an image of about 20mb, I get average execution time of ~500ms

However if comment this version and run the resizeJobPromise version, using same image, I get average time of ~7ms only!

By awaiting the two pipelines sequentially, I would expect to maybe get double the time, but not 100x.

I read that the pipeline version is safer to use, since it automatically handles errors on readable and closes the writable stream preventing memory leak, and on the promise version I'd have to handle those errors manually.

Is there something wrong I'm doing in the promise version? What could be happening behind the code for it to have such higher performance?


Solution

  • Is there something wrong I'm doing in the promise version?

    Yes, you're not measuring execution time of the streams. Notice that

    promises.push(sharpStream.clone().png().pipe(new stream.PassThrough()))
    promises.push(sharpStream.clone().jpeg().pipe(new stream.PassThrough()))
    

    just pushes the stream objects into an array, passing those to Promise.all will not do any waiting for the streams to finish but immediately fulfills with the stream objects. You may as well omit the promise stuff from this function.

    What you ought to be doing is to pipeline the streams into the file/s3 write streams:

    const sharp = require('sharp')
    const fs = require('fs')
    const util = require('util')
    const stream = require('stream')
    const pipeline = util.promisify(stream.pipeline)
    
    function resizeJob() {
      const sharpStream = sharp({
        failOnError: false
      }).resize({width: 800, height: 800, fit: 'inside'})
    
      const source = fs.createReadStream('big_img.jpg')
      // using writeStream here, the final code will do s3 upload instead
      const pngTarget = fs.createWriteStream('resized.png')
      const jpgTarget = fs.createWriteStream('resized.jpg')
    
      const promises = [
        pipeline(readableStream, sharpStream), // don't do this piping twice!
        pipeline(sharpStream.clone().png(), memoryPng),
        pipeline(sharpStream.clone().jpeg(), memoryJpg),
      ]
      return Promise.all(promises)
    }
    
    console.time('resize')
    resizeJob().catch(err => {
      console.log(err)
    }).then(() => {
      console.timeEnd('resize')
    })