I have an input stream that needs to be processed by two different pipelines.
My problem: When an error occurs in the source, I have not found a way to properly catch / handle the error.
How can I catch the error thrown by the genSource
generator inside the try/catch of my main function? Is there another/better way to pipe the same source into both pipelines than I'm doing now?
import { PassThrough, Readable } from "node:stream";
import { pipeline } from "node:stream/promises";
async function* genSource() {
for (let index = 0; index < 10; index++) {
if (index === 8) throw new Error("Foobar");
yield { index };
}
}
const sleep = async (ms = 100) =>
await new Promise((r) => {
setTimeout(() => {
r(true);
}, ms);
});
// irrelevant, just a transform example that mimics a pipeline step
async function* processing1(asyncIterable: AsyncIterable<number>) {
for await (const item of asyncIterable) {
await sleep(100);
yield item;
}
}
// irrelevant, just a transform example that mimics a pipeline step
async function* processing2(asyncIterable: AsyncIterable<number>) {
for await (const item of asyncIterable) {
await sleep(110);
yield item;
}
}
process.on("uncaughtException", (e) => {
console.log("Uncaught exception — should never be called!", e);
});
async function main() {
try {
const source = Readable.from(genSource()).on("error", (error) => {
throw error;
// this is not caught in the try / catch of the main function, but is unhandled
});
// I'm doing this to use the same source for both pipelines
const pipe1Clone = source.pipe(new PassThrough({ objectMode: true }));
const pipe2Clone = source.pipe(new PassThrough({ objectMode: true }));
const pipe1 = pipeline(pipe1Clone, processing1);
const pipe2 = pipeline(pipe2Clone, processing2);
await Promise.all([pipe1, pipe2]);
console.info("Pipelines succeeded.");
} catch (e: unknown) {
console.error(e);
throw e;
}
}
main();
I found a solution. When piping the original source stream into the two PassThrough intermediaries, it needs to be setup in a way that errors are propagated.
export const makeErrorSafeCopy = (
readable: Readable,
transform: PassThrough = new PassThrough({ objectMode: true })
) => {
readable.on("error", function (e) {
transform!.emit("error", e);
});
readable.pipe(transform!);
return transform;
}
const source = Readable.from(genSource());
// Errors that occur in the source stream above, are now propagated to the clones, so we can catch them in the pipeline
const pipe1Clone = makeErrorSafeCopy(source);
const pipe2Clone = makeErrorSafeCopy(source);