javascriptasync-awaitobservableecmascript-2016

How to "multicast" an async iterable?


Can an async generator be somehow broadcast or multicast, so that all its iterators ("consumers"? subscribers?) receive all values?

Consider this example:

const fetchMock = () => "Example. Imagine real fetch";
async function* gen() {
  for (let i = 1; i <= 6; i++) {
    const res = await fetchMock();
    yield res.slice(0, 2) + i;
  }
}
const ait = gen();

(async() => {
  // first "consumer"
  for await (const e of ait) console.log('e', e);
})();
(async() => {
  // second...
  for await (const é of ait) console.log('é', é);
})();

Iterations "consume" a value, so only one or the other gets it. I would like for both of them (and any later ones) to get every yielded value, if such a generator is possible to create somehow. (Similar to an Observable.)


Solution

  • This is not easily possible. You will need to explicitly tee it. This is similar to the situation for synchronous iterators, just a bit more complicated:

    const AsyncIteratorProto = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype));
    function teeAsync(iterable) {
        const iterator = iterable[Symbol.asyncIterator]();
        const buffers = [[], []];
        function makeIterator(buffer, i) {
            return Object.assign(Object.create(AsyncIteratorProto), {
                next() {
                    if (!buffer) return Promise.resolve({done: true, value: undefined});
                    if (buffer.length) return buffer.shift();
                    const res = iterator.next();
                    if (buffers[i^1]) buffers[i^1].push(res);
                    return res;
                },
                async return() {
                    if (buffer) {
                        buffer = buffers[i] = null;
                        if (!buffers[i^1]) await iterator.return();
                    }
                    return {done: true, value: undefined};
                },
            });
        }
        return buffers.map(makeIterator);
    }
    

    You should ensure that both iterators are consumed at about the same rate so that the buffer doesn't grow too large.