javascriptnode.jstypescriptnode-streams

Stream Pipeline stops midway


So, I wanted to execute a process in a stream. At first, it works well with just few data/iterations but if there's more data involved, the pipeline just stops midway.

Here's what my code looks like.

Basically, for this example, I wanted to process around 200 data.

but only 30 data are processed successfully. It stops after that. No error message and I don't have any clue why.

import { pipeline } from "stream/promises"

async function* func() {
  for (let i = 0; i < 200; i++) {
    console.log(`func ${i}`)
    yield i
  }
}
async function* func2(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
;    console.log(`func2 ${i}`)
    yield i
  }
}
async function* func3(iterator: AsyncIterable<number>) {
  for await (let i of iterator) {
    console.log(`func3 ${i}`)
    yield i
  }
}

async function main() {
  await pipeline(
    func,
    func2,
    func3
  )
}

main();

Expected output:

Iteration completes 200

Actual output:

Iteration stops at 30


Solution

  • After constant debugging, I found the solution and it's actually pretty simple lol.

    The solution was to remove 'yield' on the last function (which this is the PipelineDestination).

    I believe what happened was that all previous data just got accumulated in the stream since nothing consumed them.

    import { pipeline } from "stream/promises"
    
    async function* func() {
      for (let i = 0; i < 200; i++) {
        console.log(`func ${i}`)
        yield i
      }
    }
    async function* func2(iterator: AsyncIterable<number>) {
      for await (let i of iterator) {
    ;    console.log(`func2 ${i}`)
        yield i
      }
    }
    async function* func3(iterator: AsyncIterable<number>) {
      for await (let i of iterator) {
        console.log(`func3 ${i}`)
        // yield i <---- I removed this!
      }
    }
    
    async function main() {
      await pipeline(
        func,
        func2,
        func3
      )
    }
    
    main();