node.jsreactive-programmingrxjs

What is the "reactive" way to read file line-by-line


I'm learning reactive programming using RxJS and encounter a case when I need to read a file line-by-line. Actually I solved it using a solution likes:

https://gist.github.com/yvele/447555b1c5060952a279

It works, but I need to use some normal JS code to transform the stream of Buffers to stream of lines. (use "readline" module in example above)

I wonder if there are other ways to transform an Observable of Buffer to Observable of line, using RxJS operators, likes example below.

var Rx = require('rx');
var fs = require('fs');
var lines = Rx.Observable
  .fromEvent(rl, 'data') // emits buffers overtime
  // some transforms ...
  .subscribe(
    (line) => console.log(line), // emit string line by line
    err => console.log("Error: %s", err),
    () => console.log("Completed")
  );

Solution

  • You can probably achieve something pretty close to what you want with scan and concatMap.

    Something like:

    bufferSource
      .concat(Rx.Observable.of("\n")) // parens was missing // to make sure we don't miss the last line!
      .scan(({ buffer }, b) => {
        const splitted = buffer.concat(b).split("\n");
        const rest = splitted.pop();
        return { buffer: rest, items: splitted };
      }, { buffer: "", items: [] })
      // Each item here is a pair { buffer: string, items: string[] }
      // such that buffer contains the remaining input text that has no newline
      // and items contains the lines that have been produced by the last buffer
      .concatMap(({ items }) => items)
      // we flatten this into a sequence of items (strings)
      .subscribe(
        item => console.log(item),
        err => console.log(err),
        () => console.log("Done with this buffer source"),
      );