javascriptpipelineteewhatwg-streams-api

How to pipe two ReadableStreams into one WritableStream in Javascript?


I have two ReadableStreams, and I want to pipe them into one WritableStream, where any data that comes through the ReadableStreams goes directly into the WritableStream right then.

I can do the opposite, by using ReadableStream.prototype.tee() to split one ReadableStream into two, but I do not know how to combine two into one.

const textarea = document.querySelector("textarea");


// This is a ReadableStream which says "Mom! " every 1 second.
const momReadableStream = new ReadableStream({ start: controller => {
  const sayMom = () => controller.enqueue("Mom! ");
  setInterval(sayMom, 1000);
}});

// This is a ReadableStream which says "Lois! " every 0.7 seconds.
const loisReadableStream = new ReadableStream({ start: controller => {
  const sayLois = () => controller.enqueue("Lois! ");
  setInterval(sayLois, 700);
}});

// This is a WritableStream which displays what it receives in a textarea.
const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });


momReadableStream.pipeTo(writableStream).catch(console.error); // Works fine, words display
loisReadableStream.pipeTo(writableStream).catch(console.error); // Words do not display, and Errors with "Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"
<textarea readonly></textarea>


Solution

  • Manually, by racing the most recent read from each reader to produce the overall read and initiating those reads as necessary:

    const never = new Promise(() => {});
    
    const mergeStreams = streams => {
        const readers = streams.map(s => s.getReader());
        const reads = streams.map(() => null);
        const dones = [];
        const allDone = Promise.all(streams.map(s => new Promise(resolve => {
            dones.push(resolve);
        })));
    
        return new ReadableStream({
            start: controller => {
                allDone.then(() => {
                    controller.close();
                });
            },
            pull: controller =>
                Promise.race(
                    readers.map((r, i) =>
                        reads[i] ??= r.read().then(({value, done}) => {
                            if (done) {
                                dones[i]();
                                return never;
                            }
    
                            controller.enqueue(value);
                            reads[i] = null;
                        })
                    )
                ),
            cancel: reason => {
                for (const reader of readers) {
                    reader.cancel(reason);
                }
            },
        });
    };
    

    const textarea = document.querySelector("textarea");
    
    
    const never = new Promise(() => {});
    
    const mergeStreams = streams => {
        const readers = streams.map(s => s.getReader());
        const reads = streams.map(() => null);
        const dones = [];
        const allDone = Promise.all(streams.map(s => new Promise(resolve => {
            dones.push(resolve);
        })));
    
        return new ReadableStream({
            start: controller => {
                allDone.then(() => {
                    controller.close();
                });
            },
            pull: controller =>
                Promise.race(
                    readers.map((r, i) =>
                        reads[i] ??= r.read().then(({value, done}) => {
                            if (done) {
                                dones[i]();
                                return never;
                            }
    
                            controller.enqueue(value);
                            reads[i] = null;
                        })
                    )
                ),
            cancel: reason => {
                for (const reader of readers) {
                    reader.cancel(reason);
                }
            },
        });
    };
    
    
    // This is a ReadableStream which says "Mom! " every 1 second.
    const momReadableStream = new ReadableStream({ start: controller => {
      const sayMom = () => controller.enqueue("Mom! ");
      setInterval(sayMom, 1000);
    }});
    
    // This is a ReadableStream which says "Lois! " every 0.7 seconds.
    const loisReadableStream = new ReadableStream({ start: controller => {
      const sayLois = () => controller.enqueue("Lois! ");
      setInterval(sayLois, 700);
    }});
    
    // This is a WritableStream which displays what it receives in a textarea.
    const writableStream = new WritableStream({ write: (chunk, controller) => textarea.value += chunk });
    
    
    mergeStreams([
      momReadableStream,
      loisReadableStream,
    ]).pipeTo(writableStream).catch(console.error);
    <textarea readonly></textarea>