c++reactivevariadicrxcpp

How to use RxCpp operators on a list of observables?


I want to use zip operator on a list / array of RxCpp observables, but RxCpp operators only accept variadic arguments, and the size of my list is dynamic (known at runtime).

Is there a way to use these operators on a list of observables? Thanks in advance.


Solution

  • Here is a naive attempt which iteratively zips each observable in a vector and accumulates the result:

    template <typename T>
    rxcpp::observable<std::shared_ptr<std::vector<T>>> zip_v(const std::vector<rxcpp::observable<T>>& observables) {
        // map the first observable to convert values to a single-element vector
        auto it = observables.cbegin();
        rxcpp::observable<std::shared_ptr<std::vector<T>>> acc = it->map([](T t) {
            return std::make_shared<std::vector<T>>(std::initializer_list<T>{ t });
        });
    
        // fold each observable into accumulator by zipping, and pushing back value
        while (++it != observables.cend()) {
            acc = acc.zip([](std::shared_ptr<std::vector<T>> acc, T next) { 
                acc->push_back(next);
                return acc;
            }, *it);
        }
        return acc;
    }
    

    Usage example:

        std::vector<rxcpp::observable<int>> sources{
            rxcpp::observable<>::range(1, 5),
            rxcpp::observable<>::range(6, 10),
            rxcpp::observable<>::range(11, 15)
        };
        zip_v(sources).
            take(3).
            subscribe(
                [](auto vs){
                    for (auto& v : *vs) {
                        printf("%d ", v);
                    }
                    printf("\n");
                },
                [](){ printf("complete\n"); });
    

    Output:

        1 6 11
        2 7 12
        3 8 13
        OnCompleted