Consider a blocking function: this_thread::sleep_for(milliseconds(3000));
I'm trying to get the following behavior:
Trigger Blocking Function
|---------------------------------------------X
I want to trigger the blocking function and if it takes too long (more than two seconds), it should timeout.
I've done the following:
my_connection = observable<>::create<int>([](subscriber<int> s) {
auto s2 = observable<>::just(1, observe_on_new_thread()) |
subscribe<int>([&](auto x) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
});
}) |
timeout(seconds(2), observe_on_new_thread());
I can't get this to work. For starters, I think s can't on_next from a different thread.
So my question is, what is the correct reactive way of doing this? How can I wrap a blocking function in rxcpp and add a timeout to it?
Subsequently, I want to get an RX stream that behaves like this:
Trigger Cleanup
|------------------------X
(Delay) Trigger Cleanup
|-----------------X
Great question! The above is pretty close.
Here is an example of how to adapt blocking operations to rxcpp. It does libcurl polling to make http requests.
The following should do what you intended.
auto sharedThreads = observe_on_event_loop();
auto my_connection = observable<>::create<int>([](subscriber<int> s) {
this_thread::sleep_for(milliseconds(3000));
s.on_next(1);
s.on_completed();
}) |
subscribe_on(observe_on_new_thread()) |
//start_with(0) | // workaround bug in timeout
timeout(seconds(2), sharedThreads);
//skip(1); // workaround bug in timeout
my_connection.as_blocking().subscribe(
[](int){},
[](exception_ptr ep){cout << "timed out" << endl;}
);
subscribe_on
will run the create
on a dedicated thread, and thus create
is allowed to block that thread.timeout
will run the timer on a different thread, that can be shared with others, and transfer all the on_next
/on_error
/on_completed
calls to that same thread.as_blocking
will make sure that subscribe
does not return until it has completed. This is only used to prevent main()
from exiting - most often in test or example programs.EDIT: added workaround for bug in timeout
. At the moment, it does not schedule the first timeout until the first value arrives.
EDIT-2: timeout
bug has been fixed, the workaround is not needed anymore.