javascriptbrowserwhatwg-streams-api

Why does read not return in BYOB mode when stream is closed


I am working with ReadableStreams in JS/Browser code and hit the following issue. When using a ReadableStreamBYOBReader to control the size of stream reads, if the stream is closed during a read request without enqueued data, the read never returns. See the first snippet below.

The last read does return as expected when using ReadableStreamDefaultReader to read the stream instead. See the second snippet below (the stream implementations are identical in the snippets).

Given that this hangs the same way in both Chrome and FF, it seems like expected behavior. But if so, how do you close a stream being read in BYOB mode when you don't know how much data is remaining until each read request is made? Raising an error on the controller or cancelling the stream triggers a return, but both are ugly.

I've read some of the whatwg spec, it does not directly says what should happen when the stream is closed and there are no chunks in BYOD close steps (no chunk), but it does say the stream should close in Default mode.

If the stream becomes closed, then the promise is fulfilled with the remaining elements in the stream, which might be fewer than the initially requested amount. If not given, then the promise resolves when at least one element is available.

The snippets below run in Chrome and FF, but not Safari because Safari doesn't have the full stream API yet. Also, please ignore that byobRequest is not used and that stream read isn't optimized. My goal was to simplify the logic.

// Hangs if you try to read and the stream closes (when readBlocks > streamBlocks)
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);

readAllBYOB(reader, buffer).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.log("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.log("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.log(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.log(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        // In BYOD mode, the read never returns. Unless you do one of the following:
        //  1. Enqueueing data before calling close (but we don't have any to enqueue)
        //  2. Call controller.error(), but that's ugly
        //  3. Call stream.cancel(), which also seems wrong
        if (totalBytesOutput >= blockSize * loops) {
          console.log("stream pull- closing");
          controller.close();
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAllBYOB(reader, output) {
  let targetBytes = output.byteLength;
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.log('readAllBYOB- start: ', targetBytes);

  while (readBytes < targetBytes) {
    console.log('readAllBYOB- try reading:', output.byteLength);

    // This does not return on the final read, even when stream is closed
    let { done, value } = await reader.read(output);
    console.log('readAllBYOB- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
    if (readBytes < targetBytes) {
      output = new Uint8Array(targetBytes - readBytes);
    }
  }

  console.log(
    'readAllBYOB- blocks, remainingBytes, done:',
    blocks.length,
    targetBytes - readBytes,
    streamDone
  );

  return [blocks, streamDone];
}

// Works as expected
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader();

readAll(reader).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.log("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.log("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.log(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.log(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        // In BYOD mode, the read never returns. Unless you do one of the following:
        //  1. Enqueueing data before calling close (but we don't have any to enqueue)
        //  2. Call controller.error(), but that's ugly
        //  3. Call stream.cancel(), which also seems wrong
        if (totalBytesOutput >= blockSize * loops) {
          console.log("stream pull- closing");
          controller.close();
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAll(reader) {
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.log('readAll- start');

  while (true) {
    console.log('readAll- try reading');

    // This always returns as expected
    let { done, value } = await reader.read();
    console.log('readAll- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
  }

  console.log(
    'readAll- blocks, done:',
    blocks.length,
    streamDone
  );

  return [blocks, streamDone];
}


Solution

  • Seems that you are supposed to fulfill the BYOB request. To do so, you can call the respond() method of the controller's .byobRequest, passing 0 as bytesRead.

    const readBlocks = 4;
    const streamBlocks = 3;
    const blockSize = 1024;
    
    const stream = makeStream(streamBlocks);
    const reader = stream.getReader({ mode: "byob" });
    let buffer = new Uint8Array(blockSize * readBlocks);
    
    readAllBYOB(reader, buffer).then(([blocks, done]) => {
      reader.releaseLock();
      let byteLen = 0;
      for(const block of blocks) {
         byteLen += block.byteLength;
      }
      console.log("all done, bytes read:", byteLen, done);
    });
    
    function makeStream(loops) {
      let totalBytesOutput = 0;
      console.log("creating stream size:", loops * blockSize);
    
      return new ReadableStream({
        type: "bytes",
    
        async start(controller) {
          console.log(
            `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
          );
    
          try {
            const data = new TextEncoder().encode("s".repeat(blockSize));
            totalBytesOutput += data.byteLength;
            console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
            controller.enqueue(data);
          } catch (err) {
            console.error("stream start- error, closing", err);
            controller.error(err);
          }
        },
    
        async pull(controller) {
          // ignoring actual byobReuest object
          console.log(
            `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
          );
    
          try {
            // Pretend we don't know when data runs out until the request is made.
            if (totalBytesOutput >= blockSize * loops) {
              console.log("stream pull- closing");
              controller.close();
              controller.byobRequest?.respond(0);
              return;
            }
    
            const data = new TextEncoder().encode("p".repeat(blockSize));
            totalBytesOutput += data.byteLength;
            console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
            controller.enqueue(data);
          } catch (err) {
            console.error("stream pull- error, closing", err);
            controller.error(err);
          }
        },
      });
    }
    
    async function readAllBYOB(reader, output) {
      let targetBytes = output.byteLength;
      let readBytes = 0;
      let blocks = [];
      let streamDone = false;
      console.log('readAllBYOB- start: ', targetBytes);
    
      while (readBytes < targetBytes) {
        console.log('readAllBYOB- try reading:', output.byteLength);
    
        // This does not return on the final read, even when stream is closed
        let { done, value } = await reader.read(output);
        console.log('readAllBYOB- read, done:', value?.byteLength, done);
    
        streamDone = done;
        if (value) {
          blocks.push(value);
          readBytes += value.byteLength;
        }
    
        if (done || !value) {
          break;
        }
        if (readBytes < targetBytes) {
          output = new Uint8Array(targetBytes - readBytes);
        }
      }
    
      console.log(
        'readAllBYOB- blocks, remainingBytes, done:',
        blocks.length,
        targetBytes - readBytes,
        streamDone
      );
    
      return [blocks, streamDone];
    }