mongodbobservablemongodb-scala

Why does MongoDB observeOn not use the specified execution context?


In Scala I have composed two MongoDB observables and made a call to observeOn while passing a custom execution context. The call to observeOn is made on the first observable, but the custom execution context is not propagated to the second observable.

To help illustrate this, I have written the below self-contained piece of code:

import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}

import org.apache.commons.lang3.concurrent.BasicThreadFactory.Builder
import org.mongodb.scala.bson.collection.immutable.Document
import org.mongodb.scala.{MongoClient, Observable}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}

object Test extends App {

  val client = MongoClient("mongodb://localhost")

  def insertObs = {
    client.getDatabase("test").getCollection("test").insertOne(Document("test" -> 1))
  }

  val threadPool = new ThreadPoolExecutor(2, 2, 0L,
    TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable],
    new Builder().namingPattern("Custom pool").build())
  val executionContext = ExecutionContext fromExecutor (threadPool)

  val obs = Observable(List(1, 2, 3))
  val res =
    obs.observeOn(executionContext).map {
      i =>
    println("OBS " + Thread.currentThread().getName)
    i
    }.flatMap(_ => insertObs.map {
      i =>
    println("INSERT " + Thread.currentThread().getName)
    i
    })
  Await.result(res.toFuture(), Duration(20, TimeUnit.SECONDS))
}

The output from this is as follows:

OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-2
OBS Custom pool
INSERT Thread-4

I was expecting only "Custom pool" to be used as an execution context rather than Thread-2 and Thread-4 to be used when executing the "INSERT" observables. As described in the documentation here:

MongoDB observeOn API

Specifically, it says: Use a specific execution context for future operations

Why doesn't the custom thread pool get used for the "insert" observables?


Solution

  • This seems to work as expected: See this ticket: https://jira.mongodb.org/browse/SCALA-437