javascriptecmascript-6reactive-programminghighland.js

Do something before consuming stream, using highland.js


I'm trying to code a writable stream which takes a stream of objects and inputs them into a mongodb database. Before consuming the stream of objects, I first need to wait for the db-connection to establish, but I seem to be doing something wrong, because the program never gets to the insertion-part.

// ./mongowriter.js

let mongo = mongodb.MongoClient,
    connectToDb = _.wrapCallback(mongo.connect);

export default url => _.pipeline(s => {
  return connectToDb(url).flatMap(db => {
    console.log('Connection established!');
    return s.flatMap(x => /* insert x into db */);
  });
});

....

// Usage in other file
import mongowriter from './mongowriter.js';

let objStream = _([/* json objects */]);

objStream.pipe(mongoWriter);

The program just quits without "Connection established!" ever being written to the console.

What am I missing? Is there some kind of idiom I should be following?


Solution

  • By reading the source and some general experimentation, I figured out how to do a single asynchronous thing and then continue processing through the stream. Basically, you use flatMap to replace the event from the asynchronous task with the stream you actually want to process.

    Another quirk I didn't expect and which was throwing me off, was that _.pipeline won't work unless the original stream is fully consumed in the callback. That's why it won't work simply putting in a _.map and log stuff (which was how I tried to debug it). Instead one needs to make sure to have an each or done at the end. Below is a minimal example:

    export default _ => _.pipeline( stream => {
      return _(promiseReturningFunction())
        .tap(_ => process.stdout.write('.'))
        .flatMap(_ => stream)
        .each(_ => process.stdout.write('-'));
    });
    
    // Will produce something like the following when called with a non-empty stream.
    // Note the lone '.' in the beginning.
    // => .-------------------
    

    Basically, a '.' is output when the async function is done, and a '-' for every object of the stream.

    Hopefully, this saves someone some time. Took embarrassingly long for me to figure this out. ^^