c++11rxcpp

RXCPP: Timeout on blocking function


Consider a blocking function: this_thread::sleep_for(milliseconds(3000));

I'm trying to get the following behavior:

Trigger Blocking Function               

|---------------------------------------------X

I want to trigger the blocking function and if it takes too long (more than two seconds), it should timeout.

I've done the following:

my_connection = observable<>::create<int>([](subscriber<int> s) {
    auto s2 = observable<>::just(1, observe_on_new_thread()) |
    subscribe<int>([&](auto x) {
        this_thread::sleep_for(milliseconds(3000));
        s.on_next(1);
    });
}) |
timeout(seconds(2), observe_on_new_thread());

I can't get this to work. For starters, I think s can't on_next from a different thread.

So my question is, what is the correct reactive way of doing this? How can I wrap a blocking function in rxcpp and add a timeout to it?

Subsequently, I want to get an RX stream that behaves like this:

Trigger                Cleanup

|------------------------X
                           (Delay)   Trigger           Cleanup
                                       |-----------------X

Solution

  • Great question! The above is pretty close.

    Here is an example of how to adapt blocking operations to rxcpp. It does libcurl polling to make http requests.

    The following should do what you intended.

    auto sharedThreads = observe_on_event_loop();
    
    auto my_connection = observable<>::create<int>([](subscriber<int> s) {
            this_thread::sleep_for(milliseconds(3000));
            s.on_next(1);
            s.on_completed();
        }) |
        subscribe_on(observe_on_new_thread()) |
        //start_with(0) | // workaround bug in timeout
        timeout(seconds(2), sharedThreads);
        //skip(1); // workaround bug in timeout
    
    my_connection.as_blocking().subscribe(
        [](int){}, 
        [](exception_ptr ep){cout << "timed out" << endl;}
    );
    

    EDIT: added workaround for bug in timeout. At the moment, it does not schedule the first timeout until the first value arrives.

    EDIT-2: timeout bug has been fixed, the workaround is not needed anymore.