I'd like to enable Promise cancellation for one of the methods of my library, reduce
. I'm only interested in cancelling the Promise for an async iterable, since these have a high likelihood to hang indefinitely.
const reduceAsyncIterable = async (fn, possiblyX0, state, x) => {
const iter = x[Symbol.asyncIterator]()
const y0 = isUndefined(possiblyX0) ? (await iter.next()).value : possiblyX0
if (isUndefined(y0)) {
throw new TypeError('reduce(...)(x); x cannot be empty')
}
let y = await fn(y0, (await iter.next()).value)
for await (const xi of iter) {
if (state.cancelled) return // stops async iterating if `cancel` called
y = await fn(y, xi)
}
return y
}
const reduce = (fn, x0) => {
if (!isFunction(fn)) {
throw new TypeError('reduce(x, y); x is not a function')
}
return x => {
if (isIterable(x)) return reduceIterable(fn, x0, x)
if (isAsyncIterable(x)) {
const state = { cancelled: false, resolve: () => {} }
const p = new Promise((resolve, reject) => {
state.resolve = resolve
reduceAsyncIterable(fn, x0, state, x).then(
y => state.cancelled || resolve(y)
).catch(reject)
})
p.cancel = () => { state.cancelled = true; state.resolve() } // shortcircuit the Promise `p` on `cancel` call
return p
}
if (is(Object)(x)) return reduceObject(fn, x0, x)
throw new TypeError('reduce(...)(x); x invalid')
}
}
The above code seems to work, but I can't help but feel like there are memory leaks here. In particular, at await iter.next()
and for await (const xi of iter)
. If these await statements take forever (which they likely could for an async iterator), reduceAsyncIterable
could possibly never return. This is fine from the user's perspective because of the shortcircuit happening in reduce
, as the Promise the user sees is resolved. But from the computer's perspective, would cancelling the Promise of this operation cause a memory leak?
I'd like to be able to use the cancel
function on a returned promise like so:
const myOngoingTaskPromise = reduce(someReducer, null)(myInfiniteAsyncIterable)
myOngoingTaskPromise.cancel() // resolves myOngoingTaskPromise with undefined
myOngoingTaskPromise // Promise { undefined }
I found the way, Promise.race
is like a secret weapon or something
if (isAsyncIterable(x)) {
const state = { cancel: () => {} }
const cancelToken = new Promise((_, reject) => { state.cancel = reject })
const p = Promise.race([
reduceAsyncIterable(fn, x0, x),
cancelToken,
])
p.cancel = () => { state.cancel(new Error('cancelled')) }
return p
}
No memory leaks