I have a stream of data that I casually log, but at a certain state of my program I need the data from the stream, not the latest observed up to that point(I can do that), but the newest one after:
val dataStream : Observable[Data] = ...
dataStream
.doOnNext(logger.trace(_))
.subscribe()
// ...
// at some state of program later:
// ...
def awaitOneElement(Observable[Data]): Data = ???
val expecting: Data = awaitOneElement(dataStream.filter(...))
How do I implement def awaitOneElement(Observable[Data]): Data = ???
?
I understand it's probably idiomatically incorrect, but that dirty synchronous waiting is exactly what I need. I'm fine with Observable[Data] => Future[Data]
too, will wrap with Await
on the next step.
My solution:
implicit val scheduler = mx
val pendingPromise: AtomicReference[Option[Promise[A]]] = new AtomicReference(None)
val lastSubject: ConcurrentSubject[A, A] = ConcurrentSubject.publish[A]
o.doOnNext(a => if (pendingPromise.get().nonEmpty) lastSubject.onNext(a))
.subscribe()
lastSubject
.filter(filter)
.doOnNext(a => pendingPromise.getAndSet(None).map(_.success(a)))
.subscribe()
def getNext(duration: Duration): Try[A] = {
val promise = Promise[A]()
pendingPromise.set(Some(promise))
Try(Await.result(promise.future, duration))
}
}
When getNext is called, a Promise
is created and future is returned. When the desired event occures in Observable, the promise is filled and removed.