node.jsamazon-s3promiseaws-sdk-jsnode-streams

Backpressure not draining when streaming to S3 with node


I am using the aws-sdk-js-v3 library to stream multiple data sets to the same object in s3 using the same stream.

import dotenv from 'dotenv'
import {S3} from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage';
import {PassThrough} from 'stream';
import {randomBytes} from 'crypto'
import { env } from 'process';

export async function finishS3Stream(upload,stream){
  stream.end();
  await upload.done();
}

export async function writeToStream(stream, data){
  // If the stream returns false it represents that the amount of data in the stream has passed the
  // highWaterMark threshold and we should wait for a drain event from the stream before continuing to add more data
  return new Promise((resolve) => {
    if (!stream.write(data)) {
      console.log("drain needed")
      stream.once('drain', resolve);
    } else {
      resolve();
    }
  });
}

export function createS3Stream(key,bucket) {
  const client = new S3()

  const stream = new PassThrough();

  stream.on('error', (e) => console.log(e))

  const upload = new Upload({
    params: {
      Bucket: bucket,
      Key: key,
      Body: stream,
    },
    client,
  });

  return {
    stream,
    upload,
  };
}

async function main(){
  dotenv.config()
  const bucket = env.BUCKET
  const key = env.KEY

  console.log("creating stream")
  const {stream,upload} = createS3Stream(key,bucket)

  const data1 = randomBytes(5242880)
  console.log('writing data1 to stream')
  await writeToStream(stream,data1)

  const data2 = randomBytes(5242880)
  console.log('writing data2 to stream')
  await writeToStream(stream,data2)

  console.log('closing stream')
  await finishS3Stream(upload,stream)
}

main()

For some reason, though the main function doesn't await for the stream to be drained once it hits the hightwatermark and the application exits with an exit code of 0.

Output

creating stream
writing data1 to stream
drain needed

How can I get the application to wait for the stream to be drained and why is it not waiting for the promise to be resolved and why is it exiting before writing the next dataset?


Solution

  • For those who are struggling to use the aws documentation to write multiple files or multiple datasets to the same stream here is the solution. You need to call upload asynchronously to make the s3Client start reading the stream. Then when you close it you you need to await for the upload function to resolve.

    export async function finishS3Stream(upload,stream){
      stream.end();
      // wait for the upload promise to resolve
      await upload;
    }
    
    export async function writeToStream(stream, data){
      // If the stream returns false it represents that the amount of data in the stream has passed the
      // highWaterMark threshold and we should wait for a drain event from the stream before continuing to add more data
      return new Promise((resolve) => {
        if (!stream.write(data)) {
          console.log("drain needed")
          stream.once('drain', resolve);
        } else {
          resolve();
        }
      });
    }
    
    export function createS3Stream(key,bucket) {
      const client = new S3()
    
      const stream = new PassThrough();
    
      stream.on('error', (e) => console.log(e))
    
      // Call done to start reading the string and return a Promise.
      const upload = new Upload({
        params: {
          Bucket: bucket,
          Key: key,
          Body: stream,
        },
        client,
      }).done();
    
      return {
        stream,
        upload,
      };
    }
    
    async function main(){
      dotenv.config()
      const bucket = env.BUCKET
      const key = env.KEY
    
      console.log("creating stream")
      const {stream,upload} = createS3Stream(key,bucket)
    
      const data1 = randomBytes(5242880)
      console.log('writing data1 to stream')
      await writeToStream(stream,data1)
    
      const data2 = randomBytes(5242880)
      console.log('writing data2 to stream')
      await writeToStream(stream,data2)
    
      console.log('closing stream')
      await finishS3Stream(upload,stream)
    }
    
    main()