So, I wanted to execute a process in a stream. At first, it works well with just few data/iterations but if there's more data involved, the pipeline just stops midway.
Here's what my code looks like.
Basically, for this example, I wanted to process around 200 data.
but only 30 data are processed successfully. It stops after that. No error message and I don't have any clue why.
import { pipeline } from "stream/promises"
async function* func() {
for (let i = 0; i < 200; i++) {
console.log(`func ${i}`)
yield i
}
}
async function* func2(iterator: AsyncIterable<number>) {
for await (let i of iterator) {
; console.log(`func2 ${i}`)
yield i
}
}
async function* func3(iterator: AsyncIterable<number>) {
for await (let i of iterator) {
console.log(`func3 ${i}`)
yield i
}
}
async function main() {
await pipeline(
func,
func2,
func3
)
}
main();
Expected output:
Iteration completes 200
Actual output:
Iteration stops at 30
After constant debugging, I found the solution and it's actually pretty simple lol.
The solution was to remove 'yield' on the last function (which this is the PipelineDestination
).
I believe what happened was that all previous data just got accumulated in the stream since nothing consumed them.
import { pipeline } from "stream/promises"
async function* func() {
for (let i = 0; i < 200; i++) {
console.log(`func ${i}`)
yield i
}
}
async function* func2(iterator: AsyncIterable<number>) {
for await (let i of iterator) {
; console.log(`func2 ${i}`)
yield i
}
}
async function* func3(iterator: AsyncIterable<number>) {
for await (let i of iterator) {
console.log(`func3 ${i}`)
// yield i <---- I removed this!
}
}
async function main() {
await pipeline(
func,
func2,
func3
)
}
main();