node.jsnodejs-streamfast-csv

How to dynamically split Readable stream into multiple streams depending on the count limit


I have Readable stream which provides objects to another Transform stream (which is CsvFormatterStream<I, O> from @fast-csv/format npm package). Then the csv stream is piped to fs Writeable stream to write CSV file. Please see the example at the bottom of the post.

I'm looking for a solution to split the objects from Readable stream into chunks of size n, so it will produce CSV files with max data rows count = n.

For example:

It should produce 4 CSV files: 3 CSV files with rows count = 10 and 1 CSV file with rows count = 2.

How can I achive this?


UPD. I would like to have a custom stream with configurable n which will receive objects from another stream and produce y readable streams, where y number of chunks of max size n

I've already implemented it and will post it later when SO will allow me to do this.


Example:

Prerequisites:

const { Readable } = require('node:stream');
const {format} = require('@fast-csv/format');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('node:stream/promises');

(async () => {
  try {
    const readable = new Readable({ objectMode: true });
    const csvStream = format({ headers: true });
    const fileStream = fs.createWriteStream(path.join(process.cwd(), 'test.csv'), {
      flags: 'w',
    });

    const objects = createObjects(32);
    objects.forEach(obj => readable.push(obj));
    readable.push(null);

    await pipeline(readable, csvStream, fileStream);
    console.log('Success!');
  } catch (err) {
    console.error(err);
  }
})();

function createObjects(n) {
  const objects = [];

  for (let i = 0; i < n; i++) {
    const obj = createObject(i);
    objects.push(obj);
  }

  return objects;
}

function createObject(i) {
  return {
    id: i,
    name: `Obj #${i}`,
  };
}

Solution

  • The splitting can be done by implementing the Transform stream which will accumulate chunks and release them as another stream. Then we can iterate through the results using asynchronous iterator.

    SplitterStream class:

    class SplitterStream extends Transform {
      constructor(limit, opts) {
        super({readableObjectMode: true, writableObjectMode: true});
        this.limit = limit;
        this.opts = opts;
        this.pushedCount = 0;
        this.currStream = new PassThrough(this.opts);
      }
    
      _transform(row, _encoding, callback) {
        this.currStream.write(row);
        this.pushedCount++;
    
        if (this.pushedCount === this.limit) {
          this.currStream.end();
          this.push(this.currStream);
          this.currStream = new PassThrough(this.opts);
          this.pushedCount = 0;
        }
        callback();
      }
    
      _flush(callback) {
        // If last rows were not pushed - push them before the end
        if (this.pushedCount > 0) {
          this.push(this.currStream);
          this.currStream = null;
          this.pushedCount = 0;
        }
        callback();
      }
    }
    

    Example of usage. It produces 4 files as expected

    const { Readable, Transform, PassThrough, Writable } = require('node:stream');
    const {format} = require('@fast-csv/format');
    const fs = require('fs');
    const path = require('path');
    const { pipeline } = require('node:stream/promises');
    
    class SplitterStream extends Transform {
      constructor(limit, opts) {
        super({readableObjectMode: true, writableObjectMode: true});
        this.limit = limit;
        this.opts = opts;
        this.pushedCount = 0;
        this.currStream = new PassThrough(this.opts);
      }
    
      _transform(row, _encoding, callback) {
        this.currStream.write(row);
        this.pushedCount++;
    
        if (this.pushedCount === this.limit) {
          this.currStream.end();
          this.push(this.currStream);
          this.currStream = new PassThrough(this.opts);
          this.pushedCount = 0;
        }
        callback();
      }
    
      _flush(callback) {
        // If last rows were not pushed - push them before the end
        if (this.pushedCount > 0) {
          this.push(this.currStream);
          this.currStream = null;
          this.pushedCount = 0;
        }
        callback();
      }
    }
    
    (async () => {
      try {
        // Create and fill readable
        const readable = new Readable({ objectMode: true });
        const splitterStream = new SplitterStream(15, {readableObjectMode: true, writableObjectMode: true});
    
        const objects = createObjects(32);
        objects.forEach(obj => readable.push(obj));
        readable.push(null);
    
        readable.pipe(splitterStream);
    
        let i = 1;
        for await (const _readable of splitterStream) {
          const csvStream = format({ headers: true });
          const fileStream = fs.createWriteStream(path.join(process.cwd(), `test_${i++}.csv`), {
            flags: 'w',
          });
    
          await pipeline(_readable, csvStream, fileStream);
        }
        
        console.log('Success!');
      } catch (err) {
        console.error(err);
      }
    })();
    
    function createObjects(n) {
      const objects = [];
    
      for (let i = 0; i < n; i++) {
        const obj = createObject(i);
        objects.push(obj);
      }
    
      return objects;
    }
    
    function createObject(i) {
      return {
        id: i,
        name: `Obj #${i}`,
      };
    }