I'm trying to execute some async tasks in parallel with a limitation on the maximum number of simultaneously running tasks.
There's an example of what I want to achieve:
Currently this tasks are running one after another. It's implemented this way:
export function signData(dataItem) {
cadesplugin.async_spawn(async function* (args) {
//... nestedArgs assignment logic ...
for (const id of dataItem.identifiers) {
yield* idHandler(dataItem, id, args, nestedArgs);
}
// some extra logic after all tasks were finished
}, firstArg, secondArg);
}
async function* idHandler(edsItem, researchId, args, nestedArgs) {
...
let oDocumentNameAttr = yield cadesplugin.CreateObjectAsync("CADESCOM.CPAttribute");
yield oDocumentNameAttr.propset_Value("Document Name");
...
// this function mutates some external data, making API calls and returns void
}
Unfortunately, I can't make any changes in cadesplugin.*
functions, but I can use any external libraries (or built-in Promise
) in my code.
I found some methods (eachLimit and parallelLimit) in async library that might work for me and an answer that shows how to deal with it.
But there are still two problems I can't solve:
There's a link to cadesplugin.* source code, where you can find async_spawn (and another cadesplugin.*
) function that used in my code.
That's the code I tried with no luck:
await forEachLimit(dataItem.identifiers, 5, yield* async function* (researchId, callback) {
//... nested function code
});
It leads to Object is not async iterable error.
Another attempt:
let functionArray = [];
dataItem.identifiers.forEach(researchId => {
functionArray.push(researchIdHandler(dataItem, id, args, nestedArgs))
});
await parallelLimit(functionArray, 5);
It just does nothing.
Сan I somehow solve this problem, or the generator functions won't allow me to do this?
square peg, round hole
You cannot use async iterables for this problem. It is the nature of for await .. of
to run in series. await
blocks and the loop will not continue until the awaited promise has resovled. You need a more precise level of control where you can enforce these specific requirements.
To start, we have a mock myJob
that simulates a long computation. More than likely this will be a network request to some API in your app -
// any asynchronous task
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
Using Pool
defined in this Q&A, we instantiate Pool(size=4)
where size
is the number of concurrent threads to run -
const pool = new Pool(4)
For ergonomics, I added a run
method to the Pool
class, making it easier to wrap and run jobs -
class Pool {
constructor (size) ...
open () ...
deferNow () ...
deferStacked () ...
// added method
async run (t) {
const close = await this.open()
return t().then(close)
}
}
Now we need to write an effect that uses our pool
to run myJob
. Here you will also decide what to do with the result. Note the promise must be wrapped in a thunk otherwise pool cannot control when it begins -
async function myEffect(x) {
// run the job with the pool
const r = await pool.run(_ => myJob(x))
// do something with the result
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
// return a value, if you want
return r
}
Now run everything by mapping myEffect
over your list of inputs. In our example myEffect
we return r
which means the result is also available after all results are fetched. This optional but demonstrates how program knows when everything is done -
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
full program demo
In the functioning demo below, I condensed the definitions so we can see them all at once. Run the program to verify the result in your own browser -
class Pool {
constructor (size = 4) { Object.assign(this, { pool: new Set, stack: [], size }) }
open () { return this.pool.size < this.size ? this.deferNow() : this.deferStacked() }
run (t) { return this.open().then(close => t().then(close)) }
deferNow () { const [t, close] = thread(); const p = t.then(_ => this.pool.delete(p)).then(_ => this.stack.length && this.stack.pop().close()); this.pool.add(p); return Promise.resolve(close) }
deferStacked () { const [t, close] = thread(); this.stack.push({ close }); return t.then(_ => this.deferNow()) }
}
const rand = x => Math.random() * x
const effect = f => x => (f(x), x)
const thread = close => [new Promise(r => { close = effect(r) }), close]
const sleep = ms => new Promise(r => setTimeout(r, ms))
const myJob = x =>
sleep(rand(5000)).then(_ => x * 10)
async function myEffect(x) {
const r = await pool.run(_ => myJob(x))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}
const pool = new Pool(4)
Promise.all([1,2,3,4,5,6,7,8,9,10,11,12].map(myEffect))
.then(JSON.stringify)
.then(console.log, console.error)
slow it down
Pool
above runs concurrent jobs as quickly as possible. You may also be interested in throttle
which is also introduced in the original post. Instead of making Pool
more complex, we can wrap our jobs using throttle
to give the caller control over the minimum time a job should take -
const throttle = (p, ms) =>
Promise.all([ p, sleep(ms) ]).then(([ value, _ ]) => value)
We can add a throttle
in myEffect
. Now if myJob
runs very quickly, at least 5 seconds will pass before the next job is run -
async function myEffect(x) {
const r = await pool.run(_ => throttle(myJob(x), 5000))
const s = document.createTextNode(`${r}\n`)
document.body.appendChild(s)
return r
}