javascriptnode.jsstreamingnodejs-stream

createReadStream returns before resolving


I'm writing a program to iterate files from provided directory in Linux.
I used my function "myReadFile" which return new promise, and it should resolve only when "end" event CB called.
I used myReadFile for each file in the directory and called in using "Promise.allSettled" but i can see that finally and then executed before "end" event CB called.

var fs = require('fs'); // for files and direcroeis read/write
var es = require('event-stream'); // for handling async chuncks from the stream
var now = require('performance-now'); // for timer for each file
var path = require('path') // to check file extension


var hashMap = new Map();
var t0 = now();
var t1;

const checkFilePath = (file) => {
    try {
        stats = fs.statSync(file);
        if (!stats.isFile()) {
            console.log("\033[31mError\033[0m: Specified path is not a File");
            return false;
        }
    } catch (e) {
        console.log("\033[31mError\033[0m: Invalid File Path");
        return false;
    }
    return true;
}


const checkDirectoryPath = (file) => {
    try {
        stats = fs.lstatSync(file);
        if (!stats.isDirectory()) {
            console.log("\033[31mError\033[0m: Specified path is not a directory");
            return false;
        }
    } catch (e) {
        console.log("\033[31mError\033[0m: Invalid directory Path");
        return false;
    }
    return true;
}

const IsTextFile = (file) => {
    try {
        return path.extname(file) == ".txt";
    } catch (e) {
        console.log("\033[31mError\033[0m: Invalid file path");
        return false;
    }
}


const checkFormat = () => {
    if (process.argv.length != 3) {
        console.log("\033[31mError\033[0m: Usage: node app.js [path to directory]");
        return false;
    }
    return true;
}

const myReadFile = async (fileName) => {
    return new Promise((resolve, reject) => {
        console.log("processing file name: ", fileName);
        const stream = fs.createReadStream(fileName).pipe(es.split()).pipe(
            es.mapSync((line) => {
                line.match(/\p{L}+/gu)?.map((word) => {
                    if (hashMap.has(word)) {
                        hashMap.set(word, hashMap.get(word) + 1);
                    } else {
                        hashMap.set(word, 1);
                    }
                })
            })
                
        )
        .on('error', (err) => {
            console.log('\033[31mError\033[0m: while reading file.', fileName, err);
            reject();
        })
        .on('end', () => {
            t1 = now();
            console.log(`Done processing file ` + fileName + ` in ` + (t1 - t0).toFixed(3) + `ms`);
            resolve(1);

        })
  
    });
}


 const main = async() => {
    const myPromises = []

    if (!checkFormat() || !checkDirectoryPath(process.argv[2])) {
        process.exit();
    }
    const directoryPath = process.argv[2];

    fs.readdir(directoryPath, function (err, files) {
        //handling error
        if (err) {
            return console.log('Unable to scan directory: ' + err);
        }
        //listing all files using forEach
        files.forEach(function (file) {
            // Do whatever you want to do with the file
            if (IsTextFile(file)) {
                //console.log("proccesing file name: ", file);
                myPromises.push(myReadFile(file))
            }
        });
    });

    await Promise.allSettled(myPromises).then((values) => {
        console.log("allSettled2 values: ", values);
    }).finally(() => {
        console.log("done");
    });


}

main();

for the above code, i can see the following output:

 
$ node app.js textFilesDir/
allSettled2 values:  []
done
processing file name:  fileBig1.txt
processing file name:  fileSmall1.txt
processing file name:  fileSmall2.txt
Done processing file fileSmall1.txt in 1143.886ms
Done processing file fileSmall2.txt in 4203.455ms
Done processing file fileBig1.txt in 66630.910ms

As you can see, the console log from "finally" and "then" printed before the resolve called from the function.
Do you know why and how to fix it?
Appreciate your help!

Extra:
This program should get directory path and return word counter form all text files (very large files), if you have some improvements to suggest that would be great

i tried different promise functions, like promise.all and others.
I tried changing functions to arrow functions and all i could think of, nothing fixes the issue


Solution

  • The problem is the you're using a plain callback with fs.readdir(), but then doing await Promise.allSettled() outside that callback. So, fs.readdir() returns immediately, you then call await Promise.allSettled(myPromises), but nothing has been added to the myPromises array yet because the fs.readdir() callback has not yet been called.

    The main piece of advice here is do not mix plain asynchronous callbacks with promises. If you're using promises, use them everywhere. In this case, you can use the already promisified version that is built-in to nodejs as fs.promises.readdir() here.

    const main = async () => {
        const myPromises = []
    
        if (!checkFormat() || !checkDirectoryPath(process.argv[2])) {
            process.exit();
        }
        const directoryPath = process.argv[2];
    
        const files = await fs.promises.readdir(directoryPath);
        //listing all files using forEach
        for (const file of files) {
            // Do whatever you want to do with the file
            if (IsTextFile(file)) {
                //console.log("proccesing file name: ", file);
                myPromises.push(myReadFile(file))
            }
        }
    
        await Promise.allSettled(myPromises).then((values) => {
            console.log("allSettled2 values: ", values);
        }).finally(() => {
            console.log("done");
        });
    }