In my application, I have to interact (read-only) with multiple MySQL
DBs one-by-one. For each DB, I need a certain no of connections. Interactions with a DB do not occur in a single stretch: I query the DB, take some time processing the results, again query the DB, again process the result and so on.
Each one of these interactions require multiple connections [I fire multiple queries concurrently], hence I need a ConnectionPool
that spawns when I start interacting with the DB and lives until I'm done with all queries to that DB (including the interim time intervals when I'm not querying, only processing the results).
I'm able to successfully create a ConnectionPool
with desired no of connections and obtain the implicit session
as shown below
def createConnectionPool(poolSize: Int): DBSession = {
implicit val session: AutoSession.type = AutoSession
ConnectionPool.singleton(
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
session
}
I then pass this implicit session
throughout the methods where I need to interact with DB. That ways, I'm able to fire poolSize
no of queries concurrently using this session
. Fair enough.
def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
...
methodThatInteractsWithDb
...
}
def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
...
getResultsParallely(poolSize = 32, fetchSize = 2000000)
...
}
def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))
val resultsSequenceFuture: Seq[Future[ResultClass]] = {
(0 until poolSize).map { i =>
val limit: Long = fetchSize
val offset: Long = i * fetchSize
Future(methodThatMakesSingleQuery(limit, offset))
}
}
val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)
Await.result(resultsFuture, 2.minutes)
}
There are 2 problems with this technique:
- My application is quite big and has many nested method calls, so passing
implicit session
through all methods like this (see below) isn't feasible. - In addition to the said interactions with different DBs one-by-one, I also need a single connection to another (fixed) DB throughout the lifetime of my entire application. This connection would be used to make a small write operation (logging the progress of my interactions with other DBs) after every few minutes. Therefore, I need multiple
ConnectionPool
s, one for each DB
From what I could make out of ScalikeJdbc
's docs, I came up with following way of doing it that doesn't require me to pass the implicit session
everywhere.
def createConnectionPool(poolName: String, poolSize: Int): Unit = {
ConnectionPool.add(
name = poolName,
url = "myUrl",
user = "myUser",
password = "***",
settings = ConnectionPoolSettings(initialSize = poolSize)
)
}
def methodThatInteractsWithDb(poolName: String): Unit = {
...
(DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
// interact with DB
...
}
...
}
Although this works, but I'm no longer able to parallelize the db-interaction. This behaviour is obvious since I'm using the borrow()
method, that gets a single connection from the pool. This, in turn, makes me wonder why that AutoSession
thing worked earlier: why was I able to fire multiple queries simultaneously using a single implicit session
? And if that thing worked, then why doesn't this work? But I find no examples of how to obtain a DBSession
from a ConnectionPool
that supports multiple connections.
To sum up, I have 2 problems and 2 solutions: one for each problem. But I need a single (commmon) solution that solves both the problems.
ScalikeJdbc
's limited docs aren't offering a lot of help and blogs / articles on ScalikeJdbc
are practically non-existent.
Please suggest the correct way / some work-around.
Framework versions
Scala 2.11.11
"org.scalikejdbc" %% "scalikejdbc" % "3.2.0"