javascripttypescriptasync-iterator

splice/shift first n bytes of AsyncIterable<Uint8Array>


reading the first n bytes of a byte stream (in form of a AsyncIterable) feels cumbersome and error prone.

Is there a better way to implement this?

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);

  let offset = 0;

  const iterator = stream[Symbol.asyncIterator]();

  while (true) {
    const { done, value } = await iterator.next();

    if (done) {
      throw new Error("Buffer underflow");
    } else {
      const chunk = value;
      if (chunk.length < length - offset) {
        prefix.set(chunk, offset);
        offset += chunk.length;
      } else {
        const slice = chunk.slice(0, length - offset);
        prefix.set(slice, offset);

        return [prefix, prepend(chunk.slice(slice.length), stream)];
      }
    }
  }
}

async function* prepend(
  prefix: Uint8Array,
  stream: AsyncIterable<Uint8Array>
) {
  yield prefix;
  yield* stream;
}

Solution

  • stream primitives

    We'll start by defining stream primitives -

    flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
    take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
    skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
    toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
    
    async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
      for await (const a of t) {
        yield *a
      }
    }
    
    async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
      for await (const v of t) {
        if (n-- <= 0) return
        yield v 
      }
      if (n > 0) throw Error("buffer underflow")
    }
    
    async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
      for await (const v of t) {
        if (n-- > 0) continue
        yield v
      }
      if (n > 0) throw Error("buffer underflow")
    }
    
    async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
      const r = []
      for await (const v of t) r.push(v)
      return r
    }
    

    shift

    Using these stream primitives, we can write shift in a comfortable and safe way -

    shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
    
    async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
      return [
        new Uint8Array(await toArray(take(flatten(stream), count))),
        skip(flatten(stream), count)
      ] as const
    }
    

    Let's create a mock buffer and test it -

    const buffer: AsyncIterable<Uint8Array> = {
      async *[Symbol.asyncIterator]() {
        for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
          yield new Uint8Array(v)
          await new Promise(r => setTimeout(r, 100))
        }
      }
    }
    
    async function main() {
      const [first, rest] = await shift(buffer, 4)
      console.log({
        first: Array.from(first),
        rest: await toArray(rest)
      })
    }
    
    main().then(console.log, console.error)
    
    {
      first: [0, 1, 2, 3],
      rest: [4, 5, 6, 7, 8, 9]
    }
    

    demo

    Run and verify the result on the typescript playground