Let's assume that you want to store events from some stream into the database and a client library to that database is asynchronous.
E.g. there is a method writeEvent(event: MyEvent): Future[Boolean]
that you have to call inside onNext
of your observer when the next event is emitted. Is there a good way to do this otherwise than blocking on the Future
?
The only way that I currently see on how to implement this, is to create some custom Scheduler
that allows me to return thread to the pool, until async code inside onNext
is complete.
You do not want to block like that inside your onNext
subscriber callback, that would defeat Rx. It can be chained together in a more idiomatic way.
I don't work with Futures much, but I wonder if Observable.from(Future)
would work:
someStream
.flatMap(evt => Observable.from(writeEvent(evt)))
.subscribe(
next => ...,
err => ...
)