scalarx-javarx-scalacouchbase-java-api

mapping over scala rx observables when only onComplete is called


I'm using scala observables to get items from couchbase, then i'm using map,flatMap,zip to transform the result. The problem is that if an item does not exist in couchbase then for example the .zip is not called only onComplete. Example:

import rx.lang.scala._

def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
  val values = Observable.from(keyValueIds).flatMap(id => couchbaseBucket.async().get(id))
  values.zip(Observable.from(ids)) // zip is not called if no row in couchbase with id.
  ...
}

So I wanted:

  1. Return a map of k -> v
  2. I let the .zip couple the k to the returned v (I expected v to be something like None if does not exist.
  3. I saw that zip was not called at all if no item exist in db.

I thought of after running the above code, scanning the ids input param and for each one which was not zipped with value to add an id to it's value but it's like adding another flow, I wanted the zip to handle both existing and non existing rows.

How should I handle this? How can I have the .zip handle both existing and non existing rows?


Solution

  • Don't use zip() operator. Instead, just use flatMap() and materialize().take(1). materialize() will turn the onComplete() event into a Notification that you can map to None, while a Notification with a value will be mapped to Some(value).

    def getIdsWithValues(ids: Seq[String]): Map[K, V] = {
      val values = Observable.from(keyValueIds)
       .flatMap(id => couchbaseBucket.async()
         .get(id)
         .materialize()
         .take(1)
         .map( res => if ( res.isOnComplete() )
                         (id, None)
                      else 
                         (id, Some(res.getValue))
      ...
    }