javareactive-programmingpublish-subscribeproject-reactorreactive-streams

How to emit cumulative sum only from a reactive stream?


I've a use case where the stream should only emit when the cumulative "sum" equals or exceeds a given value, n. Let's take the example of six integers with n = 5.

+---+------+---------+
| i | Emit |   Sum   |
+---+------+---------+
| 1 |    - | 1       |
| 2 |    - | 3       |
| 3 |    5 | 1       |
| 4 |    5 | 0       |
| 5 |    5 | 0       |
| 2 |    2 | 0 (end) |
+---+------+---------+

As you can see, nothing is emitted unless the sum equals or exceeds 5, except for the last element, which is emitted anyway.

Once an item is emitted, the sum gets reduced by that value (n). In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

I'm using project Reactor Flux as the Publisher; I couldn't find any method on it that allows me do what is shown above. scan comes closest, but it also emits intermediate elements that need to be filtered out.


Solution

  • In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

    It occurred to me that trying to split the response Flux myself is probably little late and quite difficult; instead, I could use something like Netty FixedLengthFrameDecoder, which does exactly what I'm looking for.

    That led me to reactor-netty source code, and after extensive digging, I found exactly what I needed.

    fun get(url: String, maxChunkSize: Int): List<ByteArray> {
        return HttpClient.create()
            .httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
            .get()
            .uri(url)
            .responseContent()
            .asByteArray()
            .collectList()
            .block()!!
    }
    

    The crucial part is httpResponseDecoder { it.maxChunkSize(maxChunkSize) }; a unit test proves this to be working:

    @Test
    
    fun testHonorsMaxChunkSize() {
        val maxChunkSize = 4096
        val chunks = FixedLengthResponseFrameClient.get(
            "http://doesnotexist.nowhere/binary", maxChunkSize
        )
    
        assertThat(chunks.subList(0, chunks.size - 1))
            .allMatch { it.size ==  maxChunkSize}
        assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
    }
    

    WebClient can be configured with a custom HttpClient (configured with httpResponseDecoder) as shown below:

    WebClient
      .builder()
      .clientConnector(ReactorClientHttpConnector(httpClient))
      .build()
      .get()
      .uri("uri")
      .exchange()
      .flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
      ...
    

    The size of these buffers would be what's set in the HttpClient.httpResponseDecoder (8192 Kb by default).