I have started using mongo scala driver in our scala akka-http project and it has been great help especially the case class support in v2.0.0 is very nifty. I'm trying to wrap my head around how to use the mongo scala driver with non-default execution context using observeOn.
Due to the nature of our java library dependencies, I'm using blocking calls to get the results from MongoDB as shown here, Helpers. I've modified the results and headResult functions from MongoDB Helpers slightly using observeOn as below but I'm noticing some strange race condition that I don't know how to resolve.
trait ImplicitObservable[C] {
val observable: Observable[C]
val converter: (C) => String
def headResult()(implicit executionContext: ExecutionContext) = Await.result(observable.observeOn(executionContext).head(), Duration(10, TimeUnit.SECONDS))
def results()(implicit executionContext: ExecutionContext): List[C] = Await.result(observable.observeOn(executionContext).toFuture(), Duration(20, TimeUnit.SECONDS)).toList
}
The results function doesn't return all the records that I'm expecting and the behavior is different every time except when I use the akka PinnedDispatcher which allows only one thread. Since its a blocking operation I would like to use a non default akka dispatcher so that it will not block my HTTP requests. I really appreciate if someone can help me with this.
# looking up dispatcher
val executionContext = system.dispatchers.lookup("mongo-dispatcher")
# application.conf
mongo-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 100
}
throughput = 1
}
My sample database client code:
def getPersonById(personId: String): PersonCaseClass = {
personCollection.find[PersonCaseClass](equal("_id", "person_12345")).first().headResult()
}