rx-javapartitioningreactive-streams

RxJava: how to extract blocks form a source observable?


I have [1,2,3,2,4,5,0,2,1,3,4,5,2,2,1,4,5]

What I want as output is

[[1,2,3,2,4,5],[1,3,4,5],[1,4,5]]

This means I want to extract blocks from the input sequence with some start marker, here 1, and some end marker, here 5. Elements in between these blocks are ignored.

Is there a readable combination of rx-java operators for achieving this?


Solution

  • Yes. You need the buffer operator that takes an opening and closing idicator. Since the elements themselves are the indicators, you'll have to publish the original sequence and funnel them into the indicators too.

    Observable
    .fromArray(1, 2, 3, 2, 4, 5, 0, 2, 1, 3, 4, 5, 2, 2, 1, 4, 5)
    .publish(v -> 
        v.buffer(
            v.filter(w -> w == 1), 
            u -> v.filter(w -> w == 5)
        )
    )
    .subscribe(System.out::println);