I'm using scala observables to get items from couchbase, then i'm using map,flatMap,zip to transform the result. The problem is that if an item does not exist in couchbase then for example the .zip
is not called only onComplete. Example:
import rx.lang.scala._
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
...
}
So I wanted:
None
if does not exist.I thought of after running the above code, scanning the ids
input param and for each one which was not zipped with value to add an id to it's value but it's like adding another flow, I wanted the zip to handle both existing and non existing rows.
How should I handle this? How can I have the .zip
handle both existing and non existing rows?
Don't use zip()
operator. Instead, just use flatMap()
and materialize().take(1)
. materialize()
will turn the onComplete()
event into a Notification
that you can map to None
, while a Notification
with a value will be mapped to Some(value)
.
def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
val values = Observable.from(keyValueIds)
.flatMap(id => couchbaseBucket.async()
.get(id)
.materialize()
.take(1)
.map( res => if ( res.isOnComplete() )
(id, None)
else
(id, Some(res.getValue))
...
}