javascriptnode.jsstreamnode-streams

NodeJS Streams - End a read stream in pipeline but still process the chunks already readed


I have the following problem :

I am using NodeJS streams with the pipeline to read a big file, do some transforms and then write it to a writable stream. The tricky part is I wanted to be able to stop reading the file if an specific condition is met, but still finish processing the already read chunks.

In the following example I am reading a text file, my transform stream(secondStream) converts the text to uppercase and send to the next stream. But if it founds a text, that means it should stop reading from the text file, which I believe means that the readstream should stop reading the chunks.

I tried several solutions, but not gonna line that I am a little confused here. So far I got the following code to work. However, by using firstStream.destroy(); makes the pipeline to throw an error

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close

I was able to 'avoid' this error by catching and ignoring it on the pipeline, but to be honest this doesn't sound safe or correct to me.

  const { Transform, Writable, Readable } = require("node:stream");
  const { pipeline } = require("node:stream/promises");
  const fs = require("node:fs");

  let shouldStop = false;
  const firstStream = fs.createReadStream("./lg.txt");

  const secondStream = new Transform({
    transform(chunk, encoding, callback) {
      const foundText = chunk.toString().search("CHAPTER 9") !== -1;

      if (foundText) {
        shouldStop = true;
      }

      const transformed = chunk.toString().toUpperCase();
      callback(null, transformed);
    },
  });

  const lastStream = process.stdout;

  firstStream.on("data", () => {
    if (shouldStop) {
      console.log("should pause");
      firstStream.destroy();
    }
  });

  await pipeline(firstStream, secondStream, lastStream).catch(
    (err) => undefined
  ); // Feels wrong to me

Is there any better way to do it? Am I missing something?

Thank you in advance friends!


Solution

  • In your transform stream, you could just "eat" or "skip" any data that is after you found the target text. In this way, you can keep all the other pipeline() logic. Rather than terminating immediately, it will just read to the end of the input stream, but will skip all data after the target text. This allows the streams to complete normally.

    const secondStream = new Transform({
        transform(chunk, encoding, callback) {
            if (shouldStop) {
                // eat any remaining data
                callback(null, "");
            } else {
                const text = chunk.toString();
                const foundText = text.search("CHAPTER 9") !== -1;
                if (foundText) {
                    // set flag to eat remaining data
                    shouldStop = true;
                }
                callback(null, text.toUpperCase());
            }
        },
    });
    

    The pipeline() function also supports an abort controller which is a supported means of aborting the pipeline while still cleaning everything up appropriately. When you abort, the pipeline() will end with a rejected promise, but you can check if the rejection was because of your abort or not and, if so, you can get your abort message.

    In your code, that can be implemented like this:

    const { Transform, Writable, Readable } = require("node:stream");
    const { pipeline } = require("node:stream/promises");
    const fs = require("node:fs");
    
    const firstStream = fs.createReadStream("./lg.txt");
    
    const ac = new AbortController();
    const signal = ac.signal;
    
    const secondStream = new Transform({
        transform(chunk, encoding, callback) {
            const text = chunk.toString();
            const foundText = text.search("CHAPTER 9") !== -1;
    
            callback(null, text.toUpperCase());
            if (foundText) {
                ac.abort(new Error("reading terminated, match found"));
            }
    
        },
    });
    
    const lastStream = process.stdout;
    
    pipeline(firstStream, secondStream, lastStream, { signal }).then(() => {
        console.log("\nall done without match");
    }).catch((err) => {
        if (err.code === "ABORT_ERR") {
            console.log(`\n${signal.reason.message}`);
        } else {
            console.log(err);
        }
    });
    

    Note: On another topic, your code is vulnerable to the search string falling across a chunk boundary and thus not being detected. The usual way of avoiding that issue is by preserving the last N characters of each chunk and prepending it to the next chunk before running your match search where N is the length of your search string - 1. This ensures you won't miss a search string that spans across chunks. You will have to adjust your output to not include the prepended text too. Since that wasn't the crux of your question here, I didn't add that logic and will leave that to you, but it is necessary for reliable matching.