I want to use zip operator on a list / array of RxCpp observables, but RxCpp operators only accept variadic arguments, and the size of my list is dynamic (known at runtime).
Is there a way to use these operators on a list of observables? Thanks in advance.
Here is a naive attempt which iteratively zips each observable in a vector and accumulates the result:
template <typename T>
rxcpp::observable<std::shared_ptr<std::vector<T>>> zip_v(const std::vector<rxcpp::observable<T>>& observables) {
// map the first observable to convert values to a single-element vector
auto it = observables.cbegin();
rxcpp::observable<std::shared_ptr<std::vector<T>>> acc = it->map([](T t) {
return std::make_shared<std::vector<T>>(std::initializer_list<T>{ t });
});
// fold each observable into accumulator by zipping, and pushing back value
while (++it != observables.cend()) {
acc = acc.zip([](std::shared_ptr<std::vector<T>> acc, T next) {
acc->push_back(next);
return acc;
}, *it);
}
return acc;
}
Usage example:
std::vector<rxcpp::observable<int>> sources{
rxcpp::observable<>::range(1, 5),
rxcpp::observable<>::range(6, 10),
rxcpp::observable<>::range(11, 15)
};
zip_v(sources).
take(3).
subscribe(
[](auto vs){
for (auto& v : *vs) {
printf("%d ", v);
}
printf("\n");
},
[](){ printf("complete\n"); });
Output:
1 6 11
2 7 12
3 8 13
OnCompleted