javarx-javarx-java3

How do I check in an operator that current element is last element?


Context: To process a Flowable<Item>, I need to first process the first item and then depending on that either accumulate all items into a single item (reduce) OR apply a simple map on each item without any accumulation (map).

One way I can think of requires operator to be aware that current element is last element. Is there any such operator which is aware whether current element is last element ? I can't use buffer because then it'll always fetch 2 elements even when accumulation shouldn't be done.

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
    flatMap(item -> {
        if(item.shouldBeAccumulated()) {
            //Accumulate data into reference
            itemRef.set(itemRef.get().addData(item.getData()));
            //Return empty to throw away consumed item;
            return Flowable.empty();
        } else {
            item.updateProperty();
            return Flowable.just(item);
        }
    })
    .applyIfLastElement(item -> {
        if (item.shouldBeAccumulated()) {
            return Flowable.just(itemRef.get());
        }
    })

Solution

  • Below is how you can do it (in RxJava 2.x which is very close to RxJava 3.x). The trick is to use defer (the best way to encapsulate state for a Flowable so that it can be subscribed to many times) and concatWith. defer also enables lazy evaluation in the case of last. Notice also as a performance improvement that you may not care about I used one element arrays instead of AtomicReference objects (to avoid unnecessary volatile reads, sets etc).

    Flowable<Integer> result = Flowable.defer(() -> {
        boolean[] isFirst = new boolean[] { true };
        Integer[] state = new Integer[1];
        Maybe<Integer> last = Maybe.defer(() -> {
            if (state[0] == null) {
                return Maybe.empty();
            } else {
                return Maybe.just(state[0]);
            }
        });
        return source //
                .flatMap(x -> {
                    if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
                            // accumulate
                            state[0] = state[0] == null ? 0 : state[0] + x;
                            isFirst[0] = false;
                            return Flowable.empty();
                        } else {
                            isFirst[0] = false;
                            return Flowable.just(x);
                        }
                    })
                .concatWith(last);
        });