asynchronousrx-javareactive-programmingrx-scala

RxJava/RxScala async code inside Observer.onNext


Let's assume that you want to store events from some stream into the database and a client library to that database is asynchronous.

E.g. there is a method writeEvent(event: MyEvent): Future[Boolean] that you have to call inside onNext of your observer when the next event is emitted. Is there a good way to do this otherwise than blocking on the Future?

The only way that I currently see on how to implement this, is to create some custom Scheduler that allows me to return thread to the pool, until async code inside onNext is complete.


Solution

  • You do not want to block like that inside your onNext subscriber callback, that would defeat Rx. It can be chained together in a more idiomatic way.

    I don't work with Futures much, but I wonder if Observable.from(Future) would work:

    someStream
        .flatMap(evt => Observable.from(writeEvent(evt)))
        .subscribe(
            next => ...,
            err => ...
    )