c++reactive-programmingrxcpp

Groups aggregate functions using rxcpp?


I'm trying to get a gist of RxCpp, the native cpp implementation of reactive extensions from Microsoft, to see if I could use it in a project, but I'm having trouble wrapping my head around the concepts.

If I had an observable templated with the following structure:

struct Person
{
    std::string name;
    std::string sex;
    int age;
}

How would I create another observable which contained, grouped by sex, the count of people, min age, max age and average age of all the events?

I've looked at examples, and I can't see how to get more than one aggregate at a time.


Solution

  • Use group_by to partition by gender and then combine the min/max/average reducers to produce the desired output per gender.

    Updated with count, output and additional comments

    This works for me:

    #include "rxcpp/rx.hpp"
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::subjects;
    using namespace rxcpp::util;
    
    using namespace std;
    
    struct Person
    {
        string name;
        string gender;
        int age;
    };
    
    int main()
    {
        subject<Person> person$;
    
        // group ages by gender
        auto agebygender$ = person$.
            get_observable().
            group_by(
                [](Person& p) { return p.gender;},
                [](Person& p) { return p.age;});
    
        // combine min max and average reductions.
        auto result$ = agebygender$.
            map([](grouped_observable<string, int> gp$){
                // the function passed to combine_latest 
                // will be called once all the source streams
                // (count, min, max, average) have produced a 
                // value. in this case, all the streams are reducers
                // that produce only one value when gp$ completes.
                // thus the function is only called once per gender 
                // with the final value of each stat.
                return gp$.
                    count().
                    combine_latest(
                        [=](int count, int min, int max, double average){
                            return make_tuple(gp$.get_key(), count, min, max, average);
                        },
                        gp$.min(),
                        gp$.max(),
                        gp$.map([](int age) -> double { return age;}).average());
            }).
            // this map() returns observable<observable<tuple<string, int, int, int, double>>>
            // the merge() returns observable<tuple<string, int, int, int, double>>
            // a grouped observable is 'hot' if it is not subscribed to immiediatly (in this case by merge)
            // then the values sent to it are lost.
            merge();
    
        // display results
        result$.
            subscribe(apply_to([](string gender, int count, int min, int max, double avg){
                cout << gender << ": count = " << count << ", range = [" << min << "-" << max << "], avg = " << avg << endl;
            }));
    
        //provide input data
        observable<>::from(
            Person{"Tom", "Male", 32},
            Person{"Tim", "Male", 12},
            Person{"Stel", "Other", 42},
            Person{"Flor", "Female", 24},
            Person{"Fran", "Female", 97}).
            subscribe(person$.get_subscriber());
    
        return 0;
    }
    

    with the resulting output

    Female: count = 2, range = [24-97], avg = 60.5
    Male: count = 2, range = [12-32], avg = 22
    Other: count = 1, range = [42-42], avg = 42