node.jsstreampipenode.js-streamthrough2

Node - Abstracting Pipe Steps into Function


I'm familiar with Node streams, but I'm struggling on best practices for abstracting code that I reuse a lot into a single pipe step.

Here's a stripped down version of what I'm writing today:

inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);

The actual work happens in transform(). The only things that really change are inputStream, transform(), and outputStream. Like I said, this is a stripped down version of what I actually use. I have a lot of error handling and logging on each pipe step, which is ultimately why I'm try to abstract the code.

What I'm looking to write is a single pipe step, like so:

inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);

What I'm struggling to understand is how to turn those pipe steps into a single function that accepts a stream and returns a stream. I've looked at libraries like through2 but I'm but not sure how that get's me to where I'm trying to go.


Solution

  • Here's what I ended up going with. I used the through2 library and the streaming API of the csv library to create the pipe function I was looking for.

    var csv = require('csv');
        through = require('through2');
    
    module.exports = function(transformFunc) {
        parser = csv.parse({columns:true, relax_column_count:true}),
        transformer = csv.transform(function(row) {
            return transformFunc(row);
        }),
        stringifier = csv.stringify({header: true});
    
        return through(function(chunk,enc,cb){
            var stream = this;
    
                parser.on('data', function(data){
                    transformer.write(data);
                });
    
                transformer.on('data', function(data){
                    stringifier.write(data);
                });
    
                stringifier.on('data', function(data){
                    stream.push(data);
                });
    
                parser.write(chunk);
    
                parser.removeAllListeners('data');
                transformer.removeAllListeners('data');
                stringifier.removeAllListeners('data');
                cb();
        })
    }
    

    It's worth noting the part where I remove the event listeners towards the end, this was due to running into memory errors where I had created too many event listeners. I initially tried solving this problem by listening to events with once, but that prevented subsequent chunks from being read and passed on to the next pipe step.

    Let me know if anyone has feedback or additional ideas.