javascriptnode.jsasync-iterator

How to handle error from fs readline.Interface async iterator


Based on the example of processLineByLine() I noticed that we cannot catch the error if the given filename does not exist. In that case the program finishes with something like:

UnhandledPromiseRejectionWarning: Error: ENOENT: no such file or directory

So the most simpler approach that I followed to raise a catchable error was to make 2 modifications to the processLineByLine() function:

  1. turn it in a generator such as function*
  2. await on file exist check await access(filename, fs.constants.F_OK)

Finally I had to convert the readline.Interface instance to an async generator. I do not like this last part in particularly. The resulting lines() function is like:

export async function* lines(filename) {
    await access(filename, fs.constants.F_OK)
    const lines = readline.createInterface({
        input: fs.createReadStream(filename),
        crlfDelay: Infinity
    })
    for await (const l of lines) {
        yield l
    }
}

Question: Is there a better approach to make lines() either return an async iterator or throw an error if the filename does not exist?

BUG report: Regarding @jfriend00 observations I have opened a Bug issue on nodejs: https://github.com/nodejs/node/issues/30831


Solution

  • Hmm, this is a tricky one. Even detecting whether the file exists as a pre-flight doesn't guarantee that you can successfully open it (it could be locked or have permission issues) and detecting if it exists before opening is a classic race condition in server development (small window, but still a race condition).

    I'm still thinking there must be a better way to get an error out of a fs.createReadStream(), but the only way I could find was to wrap it in a promise that only resolves when the file is successfully open. That lets you get the error from opening the file and propagate it back to the caller of your async function. Here's what that would look like:

    const fs = require('fs');
    const readline = require('readline');
    
    function createReadStreamSafe(filename, options) {
        return new Promise((resolve, reject) => {
            const fileStream = fs.createReadStream(filename, options);
            fileStream.on('error', reject).on('open', () => {
                resolve(filestream);
            });
    
        });
    }
    
    async function processLineByLine(f) {
      const fileStream = await createReadStreamSafe(f);
    
      const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
      });
    
      for await (const line of rl) {
        // Each line in input.txt will be successively available here as `line`.
        console.log(`Line from file: ${line}`);
      }
    }
    
    processLineByLine("nofile").catch(err => {
        console.log("caught error");
    });
    

    This makes it so that the promise that processLineByLine() returns will reject and you can handle the error there which is what I think you were asking for. If I misunderstood what you were asking for, then please clarify.

    FYI, this seems to me to be a bug in readline.createInterface() because it seems like it should reject on the first iteration of for await (const line of rl), but that doesn't appear to be what happens.

    So, as a consequence of that, even this work-around won't detect read errors on the stream after it's opened. That really needs to be fixed internal to createInterface(). I agree both a file open error or a read error should show up as a reject on for await (const line of rl).


    Another work-around for the file open issue would be to pre-open the file using await fs.promises.open(...) and pass the fd to fs.createReadStream and then you would see the error on the open yourself.


    A Different Solution - Wrapping the readLine iterator to add error handling

    Warning, this ends up looking like a bit of hack, but it's a really interesting learning project because I ended up having to wrap an the readline asyncIterator with my own in order to reject when I detected an error on the readStream (the error handling that the readline library is missing).

    I set out on a mission to figure out how to write a processLineByLine() function that would return an asyncIterator that would properly reject on stream errors (even though the readline code has bugs in this regard) while still using the readline library internally.

    The goal was to be able to write code like this:

    for await (let line of processLineByLine("somefile1.txt")) {
         console.log(line);
     }
    

    that properly handles errors on the readStream used internally, whether the file doesn't exist, exists but can't be opened or even encounters a read error later while reading. Since I'm not changing/fixing the readline interface code internally, I had to install my own error listener on the readStream and when I see an error there, I need to cause any pending or future promises from the readline interface to reject.

    Here's what I ended up with:

    // This is an experiment to wrap the lines asyncIterator with our own iterator
    // so we can reject when there's been an error on the readStream.  It's really
    // ugly, but does work.
    
    const fs = require('fs');
    const readline = require('readline');
    
    function processLineByLine(filename, options = {}) {
        const fileStream = fs.createReadStream(filename, options);
        let latchedError = null;
        let kill = new Set();
    
        fileStream.on('error', (err) => {
            latchedError = err;
            // any open promises waiting on this stream, need to get rejected now
            for (let fn of kill) {
                fn(err);
            }
        });
    
        const lines = readline.createInterface({
            input: fileStream,
            crlfDelay: Infinity
        });
    
        // create our own little asyncIterator that wraps the lines asyncIterator
        //   so we can reject when we need to
        function asyncIterator() {
            const linesIterator = lines[Symbol.asyncIterator]();
            return {
                next: function() {
                    if (latchedError) {
                        return Promise.reject(latchedError);
                    } else {
                        return new Promise((resolve, reject) => {
                            // save reject handlers in higher scope so they can be called 
                            // from the stream error handler
                            kill.add(reject);
    
                            let p = linesIterator.next();
    
                            // have our higher level promise track the iterator promise
                            // except when we reject it from the outside upon stream error
                            p.then((data => {
                                // since we're resolving now, let's removing our reject
                                // handler from the kill storage.  This will allow this scope
                                // to be properly garbage collected
                                kill.delete(reject);
                                resolve(data);
                            }), reject);
                        });
                    }
                }
            }
        }
    
        var asyncIterable = {
            [Symbol.asyncIterator]: asyncIterator
        };
    
        return asyncIterable;
    }
    
    async function runIt() {
        for await (let line of processLineByLine("xfile1.txt")) {
             console.log(line);
         }
     }
    
    runIt().then(() => {
        console.log("done");
    }).catch(err => {
        console.log("final Error", err);
    });
    

    Some explanation on how this works...

    Our own error monitoring on the stream

    First, you can see this:

        fileStream.on('error', (err) => {
            latchedError = err;
            // any open promises waiting on this stream, need to get rejected now
            for (let fn of kill) {
                fn(err);
            }
        });
    

    This is our own error monitoring on the readStream to make up for the missing error handling inside of readline. Anytime we see an error, we save it in a higher scoped variable for potential later use and, if there are any pending promises registered from readline for this stream, we "kill" them (which rejects them, you will see later how that works).

    No special handling for file open errors

    Part of the goal here was to get rid of the special handling in the previous solution for file open errors. We want ANY error on the readStream to trigger a rejection of the asyncIterable so this is a much more general purpose mechanism. the file open error gets caught in this error handling just the same way any other read error would.

    Our own asyncIterable and asyncIterator

    Calling readline.createInterace() returns an asyncIterable. It's basically the same as a regular iterable in that you call a special property on it to get an asyncIterator. That asyncIterator has a .next() property on it just like a regular iterator except when asyncIterator.next() is called, it returns a promise that resolves to an object instead of an object.

    So, that's how for await (let line of lines) works. It first calls lines[Symbol.asyncIterator]() to get an asyncIterator. Then, on that asyncIterator that it gets back, it repeatedly does await asyncIterator.next() waiting on the promise that asyncIterator.next() returns.

    Now, readline.createInterface() already returns such an asyncIterable. But, it doesn't work quite right. When the readStream gets an error, it doesn't reject the promise returned by .next() on each iteration. In fact, that promise never gets rejected or resolved. So, things get stalled. In my test app, the app would just exit because the readStream was done (after the error) and there was no longer anything keeping the app from exiting, even though a promise was still pending.

    So, I needed a way to force that promise that readlineIterator.next() had previously returned and was currently being awaited by for await (...) to be rejected. Well, a promise doesn't provide an outward interface for rejecting it and we don't have access to the internals to the readline implementation where there is access to reject it.

    My solution was to wrap the readlineIterator with my own as a sort of proxy. Then, we my own error detector sees an error and there are promise(s) outstanding from readline, I can use my proxy/wrapper to force a rejection on those outstanding promise(s). This will cause the for await (...) to see the reject and get a proper error. And, it works.

    It took me awhile to learn enough about how asyncIterators work to be able to wrap one. I owe a lot of thanks to this Asynchronous Iterators in JavaScript article which provided some very helpful code examples for constructing your own asyncIterable and asyncIterator. This is actually where the real learning came about in this exercise and where others might learn by understanding how this works in the above code.

    Forcing a wrapped promise to reject

    The "ugliness" in this code comes in forcing a promise to reject from outside the usual scope of the reject handler for that promise. This is done by storing the reject handler in a higher level scope where an error handling for the readStream can call trigger that promise to reject. There may be a more elegant way to code this, but this works.

    Making our own asyncIterable

    An async iterable is just an object that has one property on it named [Symbol.asyncIterator]. That property must be a function that, when called with no arguments, returns an asyncIterator. So, here's our asyncIterable.

    var asyncIterable = {
        [Symbol.asyncIterator]: asyncIterator
    };
    

    Making our own asyncIterator

    An asyncIterator is a function that when called returns an object with a next() property on it. Each time obj.next() is called, it returns a promise that resolves to the usual iterator tuple object {done, value}. We don't have to worry about the resolved value because we'll just get that from the readline's iterator. So, here's our asyncIterator:

    // create our own little asyncIterator that wraps the lines asyncIterator
    //   so we can reject when we need to
    function asyncIterator() {
        const linesIterator = lines[Symbol.asyncIterator]();
        return {
            next: function() {
                if (latchedError) {
                    return Promise.reject(latchedError);
                } else {
                    return new Promise((resolve, reject) => {
                        // save reject handlers in higher scope so they can be called 
                        // from the stream error handler
                        kill.push(reject);
    
                        let p = linesIterator.next();
    
                        // have our higher level promise track the iterator promise
                        // except when we reject it from the outside upon stream error
                        p.then(resolve, reject);
                    });
                }
            }
        }
    }
    

    First, it gets the asyncIterator from the readline interface (the one we're proxying/wrapping) and stores it locally in scope so we can use it later.

    Then, it returns the mandatory iterator structure of the form {next: fn}. Then, inside that function is where our wrapping logic unfolds. If we've seen a previous latched error, then we just always return Promise.reject(latchedError);. If there's no error, then we return a manually constructed promise.

    Inside the executor function for that promise, we register our reject handling by adding it into a higher scoped Set named kill. This allows our higher scoped filestream.on('error', ....) handler to reject this promise if it sees an error by calling that function.

    Then, we call linesIterator.next() to get the promise that it returns. We register an interest in both the resolve and reject callbacks for that promise. If that promise is properly resolved, we remove our reject handler from the higher level scope (to enable better garbage collection of our scope) and then resolve our wrap/proxy promise with the same resolved value.

    If that linesIterator promise rejects, we just pass the reject right through our wrap/proxy promise.

    Our own filestream error handling

    So, now the final piece of explanation. We have this error handler watching the stream:

    fileStream.on('error', (err) => {
        latchedError = err;
        // any open promises waiting on this stream, need to get rejected now
        for (let fn of kill) {
            fn(err);
        }
    });
    

    This does two things. First, it stores/latches the error so any future calls to the lines iterator will just reject with this previous error. Second, if there are any pending promises from the lines iterator waiting to be resolved, it cycles through the kill Set and rejects those promises. This is what gets the asyncIterator promise to get properly rejected. This should be happening inside the readline code, but since it isn't doing it properly, we force our wrap/proxy promise to reject so the caller sees the proper rejection when the stream gets an error.


    In the end, you can just do this as all the ugly detail is hidden behind the wrapped asyncIterable:

    async function runIt() {
        for await (let line of processLineByLine("xfile1.txt")) {
             console.log(line);
         }
     }
    
    runIt().then(() => {
        console.log("done");
    }).catch(err => {
        console.log("final Error", err);
    });