javascriptasynchronousstreamnode-mysqlhighland.js

Perform node-mysql queries in a highland stream


I have to pump lines from an input file, transform them and put them in an output file.

Since, the input file is huge, I stream it thanks to HighlandJS.

Transformation step includes async queries in MySQL DB (via node-mysql) and I can't find out how to manage async queries in a stream. My different tries gives errors or nothing.

My last try is that:

h(inputStream)
.split()
.through(JSONStream.parse())
.map(function (data) {
    h.wrapCallback(pool.query(data, function (err, rows) {
        return rows;
    }));
})
.pipe(outputStream);

Any hints on how to do that?

Thanks.


Solution

  • Short answer: Your map transformation has to return something. Right now it returns nothing.

    Long answer:

    Ok, I'm going to slightly simplify your logic for the purpouses of this answer. Let's say we want this.

    input -> map to rows -> output
    

    The problem is that the mapping is async, as you've poited out, and the map function has to return something. In this case, you just return a stream for each element in the input. So it looks like this.

    // input -> map to a stream of streams of rows -> output
    h(input).map(h.wrapCallback(pool.query)).pipe(output);
    

    The last problem is actually getting rows instead of a stream of streams. You can do this by using the flatMap transformation, which will "flatten" the stream of streams to a "normal" stream.

    // input -> map to a stream of rows -> output
    h(input).flatMap(h.wrapCallback(pool.query)).pipe(output);