In my application, I have to interact (read-only) with multiple MySQL
DBs one-by-one. For each DB, I need a certain no of connections. Interactions with a DB do not occur in a single stretch: I query the DB, take some time processing the results, again query the DB, again process the result and so on.
Each one of these interactions require multiple connections [I fire multiple queries concurrently], hence I need a ConnectionPool
that spawns when I start interacting with the DB and lives until I'm done with all queries to that DB (including the interim time intervals when I'm not querying, only processing the results).
I'm able to successfully create a ConnectionPool
with desired no of connections and obtain the implicit session
as shown below
def createConnectionPool(poolSize: Int): DBSession = {
implicit val session: AutoSession.type = AutoSession
ConnectionPool.singleton(
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
session
}
I then pass this implicit session
throughout the methods where I need to interact with DB. That ways, I'm able to fire poolSize
no of queries concurrently using this session
. Fair enough.
def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
...
methodThatInteractsWithDb
...
}
def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
...
getResultsParallely(poolSize = 32, fetchSize = 2000000)
...
}
def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))
val resultsSequenceFuture: Seq[Future[ResultClass]] = {
(0 until poolSize).map { i =>
val limit: Long = fetchSize
val offset: Long = i * fetchSize
Future(methodThatMakesSingleQuery(limit, offset))
}
}
val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)
Await.result(resultsFuture, 2.minutes)
}
There are 2 problems with this technique:
implicit session
through all methods like this (see below) isn't feasible.ConnectionPool
s, one for each DBFrom what I could make out of ScalikeJdbc
's docs, I came up with following way of doing it that doesn't require me to pass the implicit session
everywhere.
def createConnectionPool(poolName: String, poolSize: Int): Unit = {
ConnectionPool.add(
name = poolName,
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
}
def methodThatInteractsWithDb(poolName: String): Unit = {
...
(DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
// interact with DB
...
}
...
}
Although this works, but I'm no longer able to parallelize the db-interaction. This behaviour is obvious since I'm using the borrow()
method, that gets a single connection from the pool. This, in turn, makes me wonder why that AutoSession
thing worked earlier: why was I able to fire multiple queries simultaneously using a single implicit session
? And if that thing worked, then why doesn't this work? But I find no examples of how to obtain a DBSession
from a ConnectionPool
that supports multiple connections.
To sum up, I have 2 problems and 2 solutions: one for each problem. But I need a single (commmon) solution that solves both the problems.
ScalikeJdbc
's limited docs aren't offering a lot of help and blogs / articles on ScalikeJdbc
are practically non-existent.
Please suggest the correct way / some work-around.
Framework versions
Scala 2.11.11
"org.scalikejdbc" %% "scalikejdbc" % "3.2.0"
Thanks to @Dennis Hunziker, I was able to figure out the correct way to release connections borrowed from ScalikeJdbc
's ConnectionPool
. It can be done as follows:
import scalikejdbc.{ConnectionPool, using}
import java.sql.Connection
using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
// use connection (only once) here
}
// connection automatically returned to pool
With this, now I'm able to parallelize interaction with the pool.
To solve my problem of managing several ConnectionPool
s and using connections across several class
es, I ended up writing a ConnectionPoolManager
, complete code for which can be found here. By offloading the tasks of
to a singleton
object that I could use anywhere across my project, I was able to clear a lot of clutter and eliminated the need pass implicit session
across chain of methods.
EDIT-1
While I've already linked the complete code for ConnectionPoolManager
, here's a quick hint of how you can go about it
Following method of ConnectionPoolManager
lets you borrow connections from ConnectionPool
s
def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
// create a pool for db (only) if it doesn't exist
addPool(dbName, poolNameOpt)
val poolName: String = poolNameOpt.getOrElse(dbName)
DB(ConnectionPool.get(poolName).borrow())
}
Thereafter, throughout your code, you can use the above method to borrow connections from pools and make your queries
def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
// perform ScalikeJdbc SQL query here
}
}