javascriptstreamhighland.js

Highland add to stream before being completely consumed?


Exist a way to add to an stream of highland an ended message with a full description of our needs?

Less suppose we have the following situation:

const stream = high([1,4,6,7])

Then with this stream, I want to count each one of the values being processed and say

sink.drain(stream.pipe(4))

Being 4 the number of elements of the array. Consider that could be thousends of objects in an stream and I need to consume from the stream in order to be able to count.

I cannot say array.length because it is a source that could come with any information, and that information is being processed with the stream... How can I add to the stream a Message End with the description of what was consumed?


Solution

  • It sounds like you want to set up some state around your values, but without hindering the consumption of your values. I would suggest looking at some solutions that involve h.through.

    You could split a stream with fork or observe and reduce some state from the values:

    h([1, 2, 3, 4])
      .through(stream => {
        const length = stream.observe()
          .reduce(0, m => m + 1)
          .map(length => ({ length }))
    
        return h([stream, length])
          .sequence()
      })
      .errors(err => console.error(err))
      .each(x => console.log(x))
    

    You could create some state and return a new stream based on the events of the source stream:

    const h = require('highland')
    h([1, 2, 3, 4])
      .through(stream => {
        let length = 0
    
        return h(push => stream
          .tap(() => ++length)
          .errors(err => push(err))
          .each(x => push(null, x))
          .done(() => {
            push(null, `length: ${length}`)
            push(null, h.nil)
          }))
      })
      .errors(err => console.error(err))
      .each(x => console.log(x))