node.jsnode-streams

Handling errors in a stream pipeline when duplication stream source


I have an input stream that needs to be processed by two different pipelines.

My problem: When an error occurs in the source, I have not found a way to properly catch / handle the error.

How can I catch the error thrown by the genSource generator inside the try/catch of my main function? Is there another/better way to pipe the same source into both pipelines than I'm doing now?

import { PassThrough, Readable } from "node:stream";
import { pipeline } from "node:stream/promises";

async function* genSource() {
  for (let index = 0; index < 10; index++) {
    if (index === 8) throw new Error("Foobar");
    yield { index };
  }
}

const sleep = async (ms = 100) =>
  await new Promise((r) => {
    setTimeout(() => {
      r(true);
    }, ms);
  });

// irrelevant, just a transform example that mimics a pipeline step
async function* processing1(asyncIterable: AsyncIterable<number>) {
  for await (const item of asyncIterable) {
    await sleep(100);
    yield item;
  }
}

// irrelevant, just a transform example that mimics a pipeline step
async function* processing2(asyncIterable: AsyncIterable<number>) {
  for await (const item of asyncIterable) {
    await sleep(110);
    yield item;
  }
}

process.on("uncaughtException", (e) => {
  console.log("Uncaught exception — should never be called!", e);
});

async function main() {
  try {
    const source = Readable.from(genSource()).on("error", (error) => {
      throw error;
      // this is not caught in the try / catch of the main function, but is unhandled
    });

    // I'm doing this to use the same source for both pipelines
    const pipe1Clone = source.pipe(new PassThrough({ objectMode: true }));
    const pipe2Clone = source.pipe(new PassThrough({ objectMode: true }));

    const pipe1 = pipeline(pipe1Clone, processing1);
    const pipe2 = pipeline(pipe2Clone, processing2);

    await Promise.all([pipe1, pipe2]);

    console.info("Pipelines succeeded.");
  } catch (e: unknown) {
    console.error(e);
    throw e;
  }
}

main();



Solution

  • I found a solution. When piping the original source stream into the two PassThrough intermediaries, it needs to be setup in a way that errors are propagated.

    export const makeErrorSafeCopy = (
      readable: Readable,
      transform: PassThrough = new PassThrough({ objectMode: true })
    ) => {
      readable.on("error", function (e) {
        transform!.emit("error", e);
      });
      readable.pipe(transform!);
      return transform;
    }
    
    const source = Readable.from(genSource());
    
    // Errors that occur in the source stream above, are now propagated to the clones, so we can catch them in the pipeline
    const pipe1Clone = makeErrorSafeCopy(source);
    const pipe2Clone = makeErrorSafeCopy(source);