I am trying to use Kotlin to utilize more asynchronous task processing for our service that is consuming from a message queue and doing some DB relate tasks and publishing to another message queue. I am trying to understand better how scoping works when we need to pass certain values to do sequential work vs parallel work.
I copied some test scripts from another Github and for testing and trying to run it to understand some fundamentals and concepts.
I used this example and trying to do parallel tasks
fun `testing async scope`() = runTest {
val list = listOf(Pair("name1", 1000L), Pair("name2", 1000L), Pair("name3", 1000L), Pair("name4", 1000L),
Pair("name5", 1000L), Pair("name6", 1000L))
val startTime = System.currentTimeMillis()
val parentJob = CoroutineScope(IO).launch {
launch { nameRun()}
launch { employerRun() }
}
parentJob.invokeOnCompletion {
println("Finishing parentJob in Thread: ${Thread.currentThread().name} in ${System.currentTimeMillis() - startTime}")
}
}
private suspend fun nameRun() {
val listEmployer = listOf(Pair("name1", 1000L), Pair("name2", 1000L), Pair("name3", 1000L), Pair("name4", 1000L),
Pair("name5", 1000L), Pair("name6", 1000L))
coroutineScope {
val startTime = System.currentTimeMillis()
println("Launching nameRun in Thread: ${Thread.currentThread().name} at $startTime")
launch {
val results = listEmployer.map {
async {
process(it)
}
}.awaitAll() // List<B>
println(results)
}
println("Finishing nameRun in Thread: ${Thread.currentThread().name} in ${System.currentTimeMillis() - startTime}")
}
}
private suspend fun employerRun() {
val listEmployer = listOf(Pair("employer1", 1000L), Pair("employer2", 1000L), Pair("employer3", 1000L), Pair("employer4", 1000L),
Pair("employer5", 1000L), Pair("employer6", 1000L))
coroutineScope {
val startTime = System.currentTimeMillis()
println("Launching employerRun in Thread: ${Thread.currentThread().name} at $startTime")
launch {
val results = listEmployer.map {
async {
process(it)
}
}.awaitAll() // List<B>
println(results)
}
println("Finishing employerRun in Thread: ${Thread.currentThread().name} in ${System.currentTimeMillis() - startTime}")
}
}
private suspend fun process(pair : Pair<String, Long>): Boolean {
delay(pair.second * 10)
println(pair.first)
return true
}
But printing only prints these lines.
Launching employerRun in Thread: DefaultDispatcher-worker-1 @coroutine#4 at 1648941528859
Launching nameRun in Thread: DefaultDispatcher-worker-3 @coroutine#3 at 1648941528859
Finishing employerRun in Thread: DefaultDispatcher-worker-1 @coroutine#4 in 3
Finishing nameRun in Thread: DefaultDispatcher-worker-3 @coroutine#3 in 3
Is there some limitation of the scope within the IO that does not print all the logs the moment it enters a suspend function? Even then, there is new print to show the job was completed?
I am running it with -ea
What I want to do is able to basically parallel everything here
Scope Parent (Parallel) -> nameRun Scope (Parallel)
-> name1 compute
-> ...
-> name6 compute
-> employerRun Scope (Parallel)
-> employer1 compute
-> ...
-> employer6 compute
Seems like we can only get logs from Parent scope -> nameRun or employerRun, but we cant get logs from the deeper scope from employerRun/nameRun
Could it be the difference also in using coroutineScope {...}
vs using CoroutineScope(IO).launch { ... }
EDIT:
Seems to be related to this Difference between CoroutineScope and coroutineScope in Kotlin , and impact is due to this
val parentJob = CoroutineScope(IO).launch {
launch { nameRun()}
launch { employerRun() }
}
with the way it is structure, I would want to use
coroutineScope {
launch { nameRun() }
launch { employerRun() }
}
instead. And the scoping inside of nameRun()
and employerRun()
should utilize coroutineScope as well to ensure we get the required result instead of suspending it. But it seems like however, this method is actually not running in parallel