1

I am trying to use Kotlin to utilize more asynchronous task processing for our service that is consuming from a message queue and doing some DB relate tasks and publishing to another message queue. I am trying to understand better how scoping works when we need to pass certain values to do sequential work vs parallel work.

I copied some test scripts from another Github and for testing and trying to run it to understand some fundamentals and concepts.

I used this example and trying to do parallel tasks

    fun `testing async scope`() = runTest {
        val list = listOf(Pair("name1", 1000L), Pair("name2", 1000L), Pair("name3", 1000L), Pair("name4", 1000L),
                Pair("name5", 1000L), Pair("name6", 1000L))

        val startTime = System.currentTimeMillis()
        val parentJob = CoroutineScope(IO).launch {
            launch { nameRun()}
            launch { employerRun() }
        }
        parentJob.invokeOnCompletion {
            println("Finishing parentJob in Thread: ${Thread.currentThread().name} in ${System.currentTimeMillis() - startTime}")
        }
    }

    private suspend fun nameRun() {
        val listEmployer = listOf(Pair("name1", 1000L), Pair("name2", 1000L), Pair("name3", 1000L), Pair("name4", 1000L),
                Pair("name5", 1000L), Pair("name6", 1000L))
        coroutineScope {
            val startTime = System.currentTimeMillis()
            println("Launching nameRun in Thread: ${Thread.currentThread().name} at $startTime")
            launch {
                val results = listEmployer.map {
                    async {
                        process(it)
                    }
                }.awaitAll() // List<B>
                println(results)
            }
            println("Finishing nameRun in Thread: ${Thread.currentThread().name} in ${System.currentTimeMillis() - startTime}")
        }
    }

    private suspend fun employerRun() {
        val listEmployer = listOf(Pair("employer1", 1000L), Pair("employer2", 1000L), Pair("employer3", 1000L), Pair("employer4", 1000L),
                Pair("employer5", 1000L), Pair("employer6", 1000L))
        coroutineScope {
            val startTime = System.currentTimeMillis()
            println("Launching employerRun in Thread: ${Thread.currentThread().name} at $startTime")
            launch {
                val results =  listEmployer.map {
                    async {
                        process(it)
                    }
                }.awaitAll() // List<B>
                println(results)
            }
            println("Finishing employerRun in Thread: ${Thread.currentThread().name} in ${System.currentTimeMillis() - startTime}")
        }
    }
    
    private suspend fun process(pair : Pair<String, Long>): Boolean {
        delay(pair.second * 10)
        println(pair.first)
        return true
    }

But printing only prints these lines.

Launching employerRun in Thread: DefaultDispatcher-worker-1 @coroutine#4 at 1648941528859
Launching nameRun in Thread: DefaultDispatcher-worker-3 @coroutine#3 at 1648941528859
Finishing employerRun in Thread: DefaultDispatcher-worker-1 @coroutine#4 in 3
Finishing nameRun in Thread: DefaultDispatcher-worker-3 @coroutine#3 in 3

Is there some limitation of the scope within the IO that does not print all the logs the moment it enters a suspend function? Even then, there is new print to show the job was completed?

I am running it with -ea

What I want to do is able to basically parallel everything here

Scope Parent (Parallel) -> nameRun Scope (Parallel)
                            -> name1 compute
                            -> ...
                            -> name6 compute       
                        -> employerRun Scope (Parallel)
                            -> employer1 compute
                            -> ...
                            -> employer6 compute       

Seems like we can only get logs from Parent scope -> nameRun or employerRun, but we cant get logs from the deeper scope from employerRun/nameRun

Could it be the difference also in using coroutineScope {...} vs using CoroutineScope(IO).launch { ... }

EDIT:

Seems to be related to this Difference between CoroutineScope and coroutineScope in Kotlin , and impact is due to this

val parentJob = CoroutineScope(IO).launch {
    launch { nameRun()}
    launch { employerRun() }
}

with the way it is structure, I would want to use

coroutineScope {
    launch { nameRun() }
    launch { employerRun() }
}

instead. And the scoping inside of nameRun() and employerRun() should utilize coroutineScope as well to ensure we get the required result instead of suspending it. But it seems like however, this method is actually not running in parallel

Bao Thai
  • 533
  • 1
  • 6
  • 25

1 Answers1

0

Finally figured it out -

I forgot to use join() if I want to explicitly declare that I want a result from the launch { }. Otherwise it just fires and forgets. And I have to also set the async {} for my code to do perform asynchronous work or else launch is sequential.

CoroutineScope(IO).launch {
    val results =  list.map {
        async(IO) {
            val subStart = System.currentTimeMillis()
            println("Launching async in Thread: ${Thread.currentThread().name} at $subStart with element ${it.first}")
            process(it)
            val end = System.currentTimeMillis()
            println("Finishing async in Thread: ${Thread.currentThread().name} at ${end} in ${end - subStart} with element ${it.first}")
        }
    }
    val results2 =  listEmployer.map {
        async(IO) {
            val subStart = System.currentTimeMillis()
            println("Launching async in Thread: ${Thread.currentThread().name} at $subStart with element ${it.first}")
            process(it)
            val end = System.currentTimeMillis()
            println("Finishing async in Thread: ${Thread.currentThread().name} at ${end} in ${end - subStart} with element ${it.first}")
        }
    }
    println(results.awaitAll() + results2.awaitAll())
}.join()

Result:

Starting parentJob in Thread: main @coroutine#1 in 1648952427825
Launching async in Thread: DefaultDispatcher-worker-3 @coroutine#3 at 1648952427838 with element name1
Launching async in Thread: DefaultDispatcher-worker-4 @coroutine#4 at 1648952427841 with element name2
Launching async in Thread: DefaultDispatcher-worker-6 @coroutine#5 at 1648952427842 with element name3
Launching async in Thread: DefaultDispatcher-worker-2 @coroutine#6 at 1648952427842 with element name4
Launching async in Thread: DefaultDispatcher-worker-3 @coroutine#7 at 1648952427843 with element name5
Launching async in Thread: DefaultDispatcher-worker-6 @coroutine#8 at 1648952427843 with element name6
Launching async in Thread: DefaultDispatcher-worker-8 @coroutine#9 at 1648952427843 with element employer1
Launching async in Thread: DefaultDispatcher-worker-12 @coroutine#12 at 1648952427844 with element employer4
Launching async in Thread: DefaultDispatcher-worker-11 @coroutine#11 at 1648952427844 with element employer3
Launching async in Thread: DefaultDispatcher-worker-6 @coroutine#10 at 1648952427843 with element employer2
Launching async in Thread: DefaultDispatcher-worker-4 @coroutine#13 at 1648952427844 with element employer5
Launching async in Thread: DefaultDispatcher-worker-8 @coroutine#14 at 1648952427844 with element employer6
name1
name2
name3
name4
name5
name6
employer1
Finishing async in Thread: DefaultDispatcher-worker-1 @coroutine#3 at 1648952428846 in 1008 with element name1
Finishing async in Thread: DefaultDispatcher-worker-4 @coroutine#6 at 1648952428846 in 1004 with element name4
Finishing async in Thread: DefaultDispatcher-worker-8 @coroutine#4 at 1648952428846 in 1005 with element name2
Finishing async in Thread: DefaultDispatcher-worker-10 @coroutine#9 at 1648952428846 in 1003 with element employer1
Finishing async in Thread: DefaultDispatcher-worker-12 @coroutine#8 at 1648952428846 in 1003 with element name6
Finishing async in Thread: DefaultDispatcher-worker-11 @coroutine#7 at 1648952428846 in 1003 with element name5
Finishing async in Thread: DefaultDispatcher-worker-6 @coroutine#5 at 1648952428846 in 1004 with element name3
employer4
employer3
Finishing async in Thread: DefaultDispatcher-worker-9 @coroutine#12 at 1648952428847 in 1003 with element employer4
employer6
employer5
employer2
Finishing async in Thread: DefaultDispatcher-worker-5 @coroutine#13 at 1648952428848 in 1004 with element employer5
Finishing async in Thread: DefaultDispatcher-worker-1 @coroutine#14 at 1648952428847 in 1003 with element employer6
Finishing async in Thread: DefaultDispatcher-worker-3 @coroutine#11 at 1648952428847 in 1003 with element employer3
Finishing async in Thread: DefaultDispatcher-worker-7 @coroutine#10 at 1648952428848 in 1005 with element employer2
[kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit]
Bao Thai
  • 533
  • 1
  • 6
  • 25