0

I have code, something like this:

entities.forEach {
    launch() {
        doingSomethingWithDB(it)
    }
}

suspend fun doingSomethingWithDB(entity) {
    getDBConnectionFromPool()
    // doing something
    returnDBConnectionToPool()
}

And when the number of entities exceeds the size of DB connections pool (I use HikariCP), I get the error - Connection is not available.... Even if I only use the single thread (e.g. -Dkotlinx.coroutines.io.parallelism=1), I get this error anyway.

Are there best practices for limiting the number of parallel coroutines when dealing with external resources (like fixed size DB connection pool)?

mgramin
  • 431
  • 8
  • 19
  • 1
    Is the "doing something" part calling other suspend functions, or only blocking functions? If it's calling suspend functions, then those suspend functions likely are not doing the work using whichever dispatcher you limited parallelism on. And please show the code where you limited the parallelism. – Tenfour04 Jan 02 '23 at 16:31
  • @Tenfour04 "doing something" can call other suspend functions and/or blocking functions (read or write to DB). I use JVM flag `-Dkotlinx.coroutines.io.parallelism=1` – mgramin Jan 02 '23 at 17:39
  • That only limits the number of threads in the IO dispatcher, as far as I can tell. But wherever there's a suspension point in any of the coroutines, they can still be swapped in and out of service. They may not be running in parallel on separate threads, but they will still be concurrent. You need to use a different strategy. Also, if you limit parallelism in a compiler argument, you're messing up the functionality of any other coroutines in your project. – Tenfour04 Jan 02 '23 at 18:18
  • Why launch multiple coroutines if you want these actions to run one at a time? I would move the for loop inside a single coroutine. – Tenfour04 Jan 02 '23 at 18:19

1 Answers1

1

As your doingSomethingWithDB() acquires and releases resources manually at the beginning/end, limiting the parallelism is not sufficient in this case - we need to limit the concurrency. The easiest way to do this is by using a Semaphore:

val semaphore = Semaphore(8)

suspend fun doingSomethingWithDB(entity) {
    semaphore.withPermit {
        getDBConnectionFromPool()
        // doing something
        returnDBConnectionToPool()
    }
}

A few words of explanation: because coroutines can suspend and switch from thread to thread, even if we limit the parallelism of coroutines that invoke doingSomethingWithDB(), still this function can be invoked arbitrary number of times concurrently. Parallelism only means how many coroutines could be actively executing at a specific moment in time, but if any of them suspend, additional coroutines could proceed.

broot
  • 21,588
  • 3
  • 30
  • 35