javascriptasynchronousasync-awaitparallel-processingasync.js

How can I execute some async tasks in parallel with limit in generator function?


I'm trying to execute some async tasks in parallel with a limitation on the maximum number of simultaneously running tasks.

There's an example of what I want to achieve:

Task flow example

Currently this tasks are running one after another. It's implemented this way:

export function signData(dataItem) {
  cadesplugin.async_spawn(async function* (args) {
    //... nestedArgs assignment logic ...

    for (const id of dataItem.identifiers) {
      yield* idHandler(dataItem, id, args, nestedArgs);
    }
    
    // some extra logic after all tasks were finished
  }, firstArg, secondArg);
}

async function* idHandler(edsItem, researchId, args, nestedArgs) {
  ...
  let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
  yield oDocumentNameAttr.propset_Value("Document Name");
  ...
  // this function mutates some external data, making API calls and returns void
}

Unfortunately, I can't make any changes in cadesplugin.* functions, but I can use any external libraries (or built-in Promise) in my code.

I found some methods (eachLimit and parallelLimit) in async library that might work for me and an answer that shows how to deal with it.

But there are still two problems I can't solve:

  1. How can I pass main params into nested function?
  2. Main function is a generator function, so I still need to work with yield expressions in main and nested functions

There's a link to cadesplugin.* source code, where you can find async_spawn (and another cadesplugin.*) function that used in my code.

That's the code I tried with no luck:

await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) { 
  //... nested function code 
});

It leads to Object is not async iterable error.

Another attempt:

let functionArray = [];
dataItem.identifiers.forEach(researchId => {
  functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);

It just does nothing.

Сan I somehow solve this problem, or the generator functions won't allow me to do this?


Solution

  • square peg, round hole

    You cannot use async iterables for this problem. It is the nature of for await .. of to run in series. await blocks and the loop will not continue until the awaited promise has resovled. You need a more precise level of control where you can enforce these specific requirements.

    To start, we have a mock myJob that simulates a long computation. More than likely this will be a network request to some API in your app -

    // any asynchronous task
    const myJob = x =>
      sleep(rand(5000)).then(_ => x * 10)
    

    Using Pool defined in this Q&A, we instantiate Pool(size=4) where size is the number of concurrent threads to run -

    const pool = new Pool(4)
    

    For ergonomics, I added a run method to the Pool class, making it easier to wrap and run jobs -

    class Pool {
      constructor (size) ...
      open () ...
      deferNow () ...
      deferStacked () ...
    
      // added method
      async run (t) {
        const close = await this.open()
        return t().then(close)
      }
    }
    

    Now we need to write an effect that uses our pool to run myJob. Here you will also decide what to do with the result. Note the promise must be wrapped in a thunk otherwise pool cannot control when it begins -

    async function myEffect(x) {
      // run the job with the pool
      const r = await pool.run(_ => myJob(x))
    
      // do something with the result
      const s = document.createTextNode(`${r}\n`)
      document.body.appendChild(s)
    
      // return a value, if you want
      return r
    }
    

    Now run everything by mapping myEffect over your list of inputs. In our example myEffect we return r which means the result is also available after all results are fetched. This optional but demonstrates how program knows when everything is done -

    Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
      .then(JSON.stringify)
      .then(console.log, console.error)
    

    full program demo

    In the functioning demo below, I condensed the definitions so we can see them all at once. Run the program to verify the result in your own browser -

    class Pool {
      constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
      open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
      run (t) { return this.open().then(close => t().then(close)) }
      deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return Promise.resolve(close) }
      deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
    }
    const rand = x => Math.random() * x
    const effect = f => x => (f(x), x)
    const thread = close => [new Promise(r => { close = effect(r) }), close]
    const sleep = ms => new Promise(r => setTimeout(r, ms))
    
    const myJob = x =>
      sleep(rand(5000)).then(_ => x * 10)
    
    async function myEffect(x) {
      const r = await pool.run(_ => myJob(x))
      const s = document.createTextNode(`${r}\n`)
      document.body.appendChild(s)
      return r
    }
      
    const pool = new Pool(4)
    
    Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
      .then(JSON.stringify)
      .then(console.log, console.error)

    slow it down

    Pool above runs concurrent jobs as quickly as possible. You may also be interested in throttle which is also introduced in the original post. Instead of making Pool more complex, we can wrap our jobs using throttle to give the caller control over the minimum time a job should take -

    const throttle = (p, ms) =>
      Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
    

    We can add a throttle in myEffect. Now if myJob runs very quickly, at least 5 seconds will pass before the next job is run -

    async function myEffect(x) {
      const r = await pool.run(_ => throttle(myJob(x), 5000))
      const s = document.createTextNode(`${r}\n`)
      document.body.appendChild(s)
      return r
    }