3

I have a microservice on which I am using Kotlin coroutines to perform a bunch of db queries asynchronously, and I want to monitor the execution time for each one of those queries for potential performance optimization.

The implementation I have is like this:

val requestSemaphore = Semaphore(5)
val baseProductsNos = productRepository.getAllBaseProductsNos()
runBlocking {
    baseProductsNos
        .chunked(500)
        .map { batchOfProductNos ->
            launch {
                requestSemaphore.withPermit {
                    val rawBaseProducts = async {
                        productRepository.getBaseProducts(batchOfProductNos)
                    }

                    val mediaCall = async {
                        productRepository.getProductMedia(batchOfProductNos)
                    }

                    val productDimensions = async {
                        productRepository.getProductDimensions(batchOfProductNos)
                    }

                    val allowedCountries = async {
                        productRepository.getProductNosInCountries(batchOfProductNos, countriesList)
                    }

                    val variants = async {
                        productRepository.getProductVariants(batchOfProductNos)
                    }

                    // here I wait for all the results and then some processing on thm
                }
            }
        }.joinAll()
}

As you can see I use Semaphore to limit the number of parallel jobs, and all the repository methods are suspendable and those are the ones I want to measure the execution time for. Here is an example of an implementation inside ProductRepository:

  suspend fun getBaseProducts(baseProductNos: List<String>): List<RawBaseProduct> =
    withContext(Dispatchers.IO) {
      namedParameterJdbcTemplateMercator.query(
        getSqlFromResource(baseProductSql),
        getNamedParametersForBaseProductNos(baseProductNos),
        RawBaseProductRowMapper()
      )
    }

And to do that I tried this :

      val rawBaseProductsCall = async {
        val startTime = System.currentTimeMillis()

        val result = productRepository.getBaseProducts(productNos)

        val endTime = System.currentTimeMillis()
        logger.info("${TemporaryLog("call-duration", "rawBaseProductsCall", endTime - startTime)}")

        result
      }

But this measurement always returns inconsistent results for the average in contrast to the sequential implementation(without coroutines), and the only explanation I can come up with is that this measurement includes the suspension time, and obviously I am only interested in the time that the queries take to execute without a suspension time if there was any.

I don't know if what I am trying to do is possible in Kotlin, but it looks like python supports this. So I will appreciate any help to do something similar in Kotlin.

UPDATE:

In my case I am using a regular java library to query the db, so my DB queries are just regular blocking calls which means that the way I am measuring time right now is correct.

The assumption I made in the question would have been valid if I was using some implementation of R2DBC for querying my DB.

younes elmorabit
  • 343
  • 3
  • 14
  • What does your code do _except_ suspend here? Just create the RPC and parse the result? – Louis Wasserman Oct 31 '21 at 20:13
  • @LouisWasserman yes that is what it does, and I wait for the results from the queries then I do some processing on them. I am not sure if I answered your question? – younes elmorabit Oct 31 '21 at 20:29
  • So are you just trying to measure the time of creating the RPC and parsing the result? The time the server spends answering your RPC? – Louis Wasserman Oct 31 '21 at 20:35
  • yes exactly that is what I am trying to do. – younes elmorabit Oct 31 '21 at 20:36
  • But... you are measuring on the client (your microservice acts as client for the DB coroutines). Unless you have more insight on the server, what else would you go for? – Queeg Oct 31 '21 at 21:01
  • Sorry @HiranChaudhuri I don't get what you mean, but my problem is that the coroutine may not resume execution right after the query response is received because the treads are occupied or the maximum parallel jobs are reached. – younes elmorabit Oct 31 '21 at 21:26

2 Answers2

0

I don't know if this is intentional or by a mistake, but you use only a single thread here. You start tens or even hundreds of coroutines and they all fight each other for this single thread. If you perform any CPU-intensive processing in "here I wait for all the results and then some processing on thm" then while it is working, all other coroutines have to wait to be resumed from withContext(Dispatchers.IO). If you want to utilize multiple threads, replace runBlocking {} with runBlocking(Dispatchers.Default) {}.

Still, it doesn't fix the problem, but rather lessens its impact. Regarding the proper fix: if you need to measure the time spent in the IO only then... measure the time in the IO only. Just move your measurements inside withContext(Dispatchers.IO) and I think results will be closer to what you expect. Otherwise, it is like measuring the size of a room by standing outside the building.

broot
  • 21,588
  • 3
  • 30
  • 35
  • Thanks for the hints, yes I am aware that the other code will run on the same thread, and for the time measurement, I also tried what you suggested but the results were pretty much the same. – younes elmorabit Oct 31 '21 at 23:03
  • Hmm, so if you put everything inside `withContext(Dispatchers.IO) {}` and it still provided unexpected results then I can't find a good explanation for this. Assuming all functions inside this block (`query()`, `getSqlFromResource()`, etc.) are not suspendable (are they?), coroutines should not really affect execution of this code in any way. This is just a regular blocking code. – broot Oct 31 '21 at 23:18
  • `query()` is from an external java library and it is the actual query so I don't know how Kotlin will handle it, but normally this is the only place where the code could be suspended since the other functions are not suspendable. are you saying that the coroutine won't suspend the code inside `withContext(Dispatchers.IO) {}`? – younes elmorabit Nov 01 '21 at 00:24
  • 1
    Only suspend functions can suspend. If `query()` is from some Java library, then I guess it is not really a suspend function, but a regular blocking one. If we put regular, blocking code inside a suspend function, then this block of code will/should execute just normal, as if there would be no coroutines at all. – broot Nov 01 '21 at 01:02
  • If that is the case then the recorded time is correct, because right now I have almost triple the time recorded in sequential calls. could it be because of concurrency the DB is taking that much time? – younes elmorabit Nov 01 '21 at 08:43
  • I can only guess. I don't know your backend, how it was implemented, etc. If you can't control the backend to add measurements there, then maybe you can at least read its logs to get some insights about the performance. As a last resort, you can try to capture the network traffic, but if you need to calculate the average from multiple requests then it would pretty hard to do – broot Nov 01 '21 at 10:41
0

you do not want to measure the coroutine startup or suspension time so you need to measure over a block of code that will not suspend, ie.. your database calls from a java library

stdlib for example provides a few nice functions like measureTimedValue

val (result, duration) = measureTimedValue {
    doWork()
    // eg: productRepository.getBaseProducts(batchOfProductNos)
}
logger.info("operation took $duration")

https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.time/measure-timed-value.html

Nikky
  • 498
  • 2
  • 9