I have to pump lines from an input file, transform them and put them in an output file.
Since, the input file is huge, I stream it thanks to HighlandJS.
Transformation step includes async queries in MySQL DB (via node-mysql) and I can't find out how to manage async queries in a stream. My different tries gives errors or nothing.
My last try is that:
h(inputStream)
.split()
.through(JSONStream.parse())
.map(function (data) {
h.wrapCallback(pool.query(data, function (err, rows) {
return rows;
}));
})
.pipe(outputStream);
Any hints on how to do that?
Thanks.
Short answer: Your map
transformation has to return something. Right now it returns nothing.
Long answer:
Ok, I'm going to slightly simplify your logic for the purpouses of this answer. Let's say we want this.
input -> map to rows -> output
The problem is that the mapping is async, as you've poited out, and the map
function has to return something. In this case, you just return a stream for each element in the input. So it looks like this.
// input -> map to a stream of streams of rows -> output
h(input).map(h.wrapCallback(pool.query)).pipe(output);
The last problem is actually getting rows instead of a stream of streams. You can do this by using the flatMap
transformation, which will "flatten" the stream of streams to a "normal" stream.
// input -> map to a stream of rows -> output
h(input).flatMap(h.wrapCallback(pool.query)).pipe(output);