In Scala I have composed two MongoDB observables and made a call to observeOn while passing a custom execution context. The call to observeOn is made on the first observable, but the custom execution context is not propagated to the second observable.
To help illustrate this, I have written the below self-contained piece of code:
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import org.apache.commons.lang3.concurrent.BasicThreadFactory.Builder
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.{MongoClient, Observable}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
object Test extends App {
val client = MongoClient("mongodb://localhost")
def insertObs = {
client.getDatabase("test").getCollection("test").insertOne(Document("test" -> 1))
}
val threadPool = new ThreadPoolExecutor(2, 2, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable],
new Builder().namingPattern("Custom pool").build())
val executionContext = ExecutionContext fromExecutor (threadPool)
val obs = Observable(List(1, 2, 3))
val res =
obs.observeOn(executionContext).map {
i =>
println("OBS " + Thread.currentThread().getName)
i
}.flatMap(_ => insertObs.map {
i =>
println("INSERT " + Thread.currentThread().getName)
i
})
Await.result(res.toFuture(), Duration(20, TimeUnit.SECONDS))
}
The output from this is as follows:
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-4
I was expecting only "Custom pool" to be used as an execution context rather than Thread-2 and Thread-4 to be used when executing the "INSERT" observables. As described in the documentation here:
Specifically, it says: Use a specific execution context for future operations
Why doesn't the custom thread pool get used for the "insert" observables?
This seems to work as expected: See this ticket: https://jira.mongodb.org/browse/SCALA-437