reading the first n bytes of a byte stream (in form of a AsyncIterable) feels cumbersome and error prone.
Is there a better way to implement this?
async function shift(
length: number,
stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
const prefix = new Uint8Array(length);
let offset = 0;
const iterator = stream[Symbol.asyncIterator]();
while (true) {
const { done, value } = await iterator.next();
if (done) {
throw new Error("Buffer underflow");
} else {
const chunk = value;
if (chunk.length < length - offset) {
prefix.set(chunk, offset);
offset += chunk.length;
} else {
const slice = chunk.slice(0, length - offset);
prefix.set(slice, offset);
return [prefix, prepend(chunk.slice(slice.length), stream)];
}
}
}
}
async function* prepend(
prefix: Uint8Array,
stream: AsyncIterable<Uint8Array>
) {
yield prefix;
yield* stream;
}
stream primitives
We'll start by defining stream primitives -
flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
for await (const a of t) {
yield *a
}
}
async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
for await (const v of t) {
if (n-- <= 0) return
yield v
}
if (n > 0) throw Error("buffer underflow")
}
async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
for await (const v of t) {
if (n-- > 0) continue
yield v
}
if (n > 0) throw Error("buffer underflow")
}
async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
const r = []
for await (const v of t) r.push(v)
return r
}
shift
Using these stream primitives, we can write shift
in a comfortable and safe way -
shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
return [
new Uint8Array(await toArray(take(flatten(stream), count))),
skip(flatten(stream), count)
] as const
}
Let's create a mock buffer
and test it -
const buffer: AsyncIterable<Uint8Array> = {
async *[Symbol.asyncIterator]() {
for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
yield new Uint8Array(v)
await new Promise(r => setTimeout(r, 100))
}
}
}
async function main() {
const [first, rest] = await shift(buffer, 4)
console.log({
first: Array.from(first),
rest: await toArray(rest)
})
}
main().then(console.log, console.error)
{
first: [0, 1, 2, 3],
rest: [4, 5, 6, 7, 8, 9]
}
demo
Run and verify the result on the typescript playground