node.jsmultithreadingpromiseworker-thread

How to wait for Worker-Threads execution with Promises


I have a Node.js script where I need to parse a bunch of big files. My first attempt was using Promises, in general it is working. However, the script is as slow as reading the files one-by-on in a sequence. I like to run it in parallel, i.e. multi-threaded, so I tried with Worker threads but I did not succeed.

My script looks like this

const { Worker, isMainThread, parentPort } = require('worker_threads');

const files = process.argv.slice(2);
let workers = [];

const main = async () => {
   for (file of files) {
      const worker = new Worker(__filename, { argv: [file] });
      workers.push({ file: file, threadId: worker.threadId, promise: null });

      worker.on('message', function (msg) {
         if (msg == 'Done')
            workers.find(x => x.threadId == this.threadId).promise = Promise.resolve('Finished');
      });

      worker.on('online', function () {
         workers.find(x => x.threadId == this.threadId).promise = new Promise();
      });
   }

   const results = await Promise.all(workers.map(x => x.promise));
   console.log(... print some logs and summary);
   console.log('All done');

   return 0;
}

if (isMainThread) {
   (async () => {
      const rc = await main();
      process.exit(rc);
   })();

} else {
   const file = files.shift();
   const data = fs.readFileSync(file);
   // ... do my stuff with data

   parentPort.postMessage('Done');
}

I tried many variants, but none was working. Either the worker do not start or the main finishes before workers are done or the script hangs in an infinite loop.

Note, I know according to documentation Worker threads do not help much with I/O-intensive work but I like to try anyway, the data is inserted into a database. If I don't get desired performance, then I will try with Child process. And the last attempt will be to start multiple node-scripts in background from command-line (this certainly gives desired performance, but also some challenges for proper logging)


Solution

  • In the end, it turned out, that I cannot use Worker threads because I am using the node-expat module which throws Module did not self-register error when the second thread starts.

    Thus I implemented it with child processes:

    
    const { fork } = require('child_process');
    const { isPromiseResolved } = require('promise-status-async');
    
    // Running up to 20 child processes in parallel
    const windowSize = 20;
    
    let processes = [
       { file: "one_file.xml", script: "importXML.js" },
       { file: "second_file.csv", script: "importCSV.js" },
       { file: "third_file.xml", script: "importXML.js" },
       // some more
    ];
    for (let p of processes)
       p.running = false;
    
    while (processes.some(x => x.promise == null)) {
       const aProcess = processes.find(x => x.promise == null);
    
       const controller = new AbortController();
       const { signal } = controller;
       const options = { signal, timeout: 1000 * 60 * 30 };
    
       const childProcess = fork(`${__dirname}/${aProcess.script}`, [aProcess.file], options);
       aProcess.running = true;
       childProcess.send(processes.indexOf(aProcess) + 1);
       childProcess.on('message', (value) => {
          if (typeof value == 'object') {
             // getting result of child process
             processes.find(x => x.file == aProcess.file).result = value;
          } else {
             console.log(`Message from child process: ${value}`);
          }
       });
       aProcess.process = childProcess;
       aProcess.promise = new Promise((resolve, reject) => {
          childProcess.on('close', (code) => { resolve(code) });
          childProcess.on('error', (err) => { reject(err) });
       });
    
       for (let p of processes.filter(x => x.promise != null && x.running))
          p.running = ! await isPromiseResolved(p.promise);
       if (processes.filter(x => x.running).length >= windowSize)
          await Promise.any(processes.filter(x => x.promise != null && x.running).map(x => x.promise));
    }
    const results = await Promise.all(processes.map(x => x.promise)).catch((err) => { console.error(err) });
    
    

    The import files look like this:

    
    // importXML.js
    
    const file = process.argv[2];
    let fileIndex = -1;
    
    process.once('message', (message) => {
       fileIndex = message;
    });
    
    (async () => {
    
       await importFile(parser, fileIndex).catch(error => {
          process.send(
             {            
                error: 'something went wrong',
                message: error,
                code: 100
             });
       }).then((result) => {
          process.send(result);
       });
    
    })();