4

I want to update a list with the result of different threads.

mainFunction(): List<A> {
  var x: List<A> = listOf<A>()
  val job = ArrayList<Job>()
  val ans = mainScope.async {
            var i = 0
            for (j in (0..5)) {
                job.add(
                    launch {
                        val res = async {
                            func1()
                        }
                        x += res.await()
                    }
                )
            }
         job.joinAll()
        }
  ans.await()
  return x
}
fun func1(): List<A> {
 //Perform some operation to get list abc
 var abc: List<A> = listOf<A>()
 delay(1000)
 return abc
}

The list "x" is not getting updated properly. Sometimes, it appends the "res".. sometimes it does not. Is there a thread-safe way to modify lists like this?

New implementation:

mainFunction(): List<A> {
 var x: List<A> = listOf<A>()
 val ans = mainScope.async {
   List(6) {
     async{
      func1()
     }
   }.awaitAll()
 }
 print(ans)
 for (item in ans) {
  x+= item as List<A> // item is of type Kotlin.Unit
 }
}
Saakshi
  • 95
  • 8

2 Answers2

6

Short answer

Here is a simpler version of what you're doing, which avoids the synchronization problems you might be running into:

suspend fun mainFunction(): List<A> {
    return coroutineScope {
        List(6) { async { func1() } }.awaitAll()
    }
}

You can read the long answer if you want to unpack this. I will explain the different things in the original code that are not really idiomatic and could be replaced.

Long answer

There are multiple non-idiomatic things in the code in the question, so I'll try to address each of them.

Indexed for loop for 0-based range

If you just want to repeat an operation several times, it's simpler to just use repeat(6) instead of for (j in 0..5). It's easier to read, especially when you don't need the index variable:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val job = ArrayList<Job>()
    val ans = mainScope.async {
        repeat(6) {
            job.add(
                launch {
                    val res = async {
                        func1()
                    }
                    x += res.await()
                }
            )
        }
        job.joinAll()
    }
    ans.await()
    return x
}

Creating lists with a loop

If what you want is to create a list out of that loop, you can also use List(size) { computeElement() } instead of repeat (or for), which makes use of the List factory function:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val ans = mainScope.async {
        val jobs = List(6) {
            launch {
                val res = async {
                    func1()
                }
                x += res.await()
            }
        }
        jobs.joinAll()
    }
    ans.await()
    return x
}

Extra async

There is no need to wrap your launches here with an extra async, you can just use your scope on the launches directly:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            val res = async {
                func1()
            }
            x += res.await()
        }
    }
    jobs.joinAll()
    return x
}

async + immediate await

Using async { someFun() } and then immediately await-ing this Deferred result is equivalent to just calling someFun() directly (unless you're using a different scope or context, which you aren't doing here for the inner most logic).

So you can replace the inner-most part:

val res = async {
    func1()
}
x += res.await()

By just x += func1(), which gives:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            x += func1()
        }
    }
    jobs.joinAll()
    return x
}

launch vs async

If you want results, it is usually more practical to use async instead of launch. When you use launch, you have to store the result somewhere manually (which makes you run into synchronization problems like you have now). With async, you get a Deferred<T> value which you can then await(), and when you have a list of Deferred there is no synchronization issues when you await them all.

So the general idea of the previous code is bad practice and might bite you because it requires manual synchronization. You can replace it by:

suspend fun mainFunction(): List<A> {
    val deferredValues = List(6) {
        mainScope.async {
            func1()
        }
    }
    val x = deferredValues.awaitAll()
    return x
}

Or simpler:

suspend fun mainFunction(): List<A> {
    return List(6) {
        mainScope.async {
            func1()
        }
    }.awaitAll()
}

Manual joins vs coroutineScope

It is usually a smell to join() jobs manually. If you want to wait for some coroutines to finish, it is more idiomatic to launch all those coroutines within a coroutineScope { ... } block, which will suspend until all child coroutines finish.

Here we have already replaced all launch that we join() with async calls that we await, so this doesn't really apply anymore, because we still need to await() the deferred values in order to get the results. However, since we are in a suspend function already, we can still use coroutineScope instead of an external scope like mainScope to ensure that we don't leak any coroutines:

suspend fun mainFunction(): List<A> {
    return coroutineScope {
        List(6) { async { func1() } }.awaitAll()
    }
}
Joffrey
  • 32,348
  • 6
  • 68
  • 100
  • That last point on the join call is new to me. Thanks for the info. – Laurence Oct 03 '21 at 15:52
  • @Joffrey Your solution seems to be returning an aggregate list to me. For information, `func1()` returns a list ( List). `x` is supposed to be an aggregate list of all the lists returned by the function (List). In you solution, `x` is a list of `` : `[kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit]` when I am printing it as `print(x.toString())` How do I convert them to a readable format of List? – Saakshi Oct 04 '21 at 04:31
  • @Saakshi no, `x` is not a list of `Unit`. `x` is a list of whatever is returned by `func1()`. For instance, if `func1()` returns 42, you get `[42, 42, 42, 42, 42, 42]` (https://pl.kotl.in/kfI4f7zpX). Now you had not mentioned that `func1()` returned lists itself, and that you wanted a flattened list (there is no way to tell from the initial code). In this case, my current solution would return a list of lists (still not a list of `Unit`). You can simply use `.flatten()` on it to have a flat list of elements (https://pl.kotl.in/0xWyoddnb). – Joffrey Oct 04 '21 at 08:01
  • @Saakshi could you please share the exact code you're using when you get `Unit`? If you add stuff within the `async` after the `func1()` call, you will get that stuff instead of `func1()`'s value. For instance, if you add a log call (returning `Unit`), you will get a list of `Unit`. – Joffrey Oct 04 '21 at 08:44
  • @Joffrey I appended the list returned `func1()` within `async` and that worked. Thank you for the solution. Regarding, Kotlin. Util: ```suspend fun mainFunction(): List { val x = coroutineScope { List(6) { async { func1() } }.awaitAll() } print(x.toString()) }``` Here, `[kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit, kotlin.Unit]` is printed. – Saakshi Oct 04 '21 at 11:31
  • I'm glad you found something that works, but I'd like to understand why/how you're getting those `Unit` results. Did you check my playground links? They do print stuff that are not `Unit`, so could you please provide a playground showing the `Unit` result while using a `func1` that returns a `List` as you said it does? – Joffrey Oct 04 '21 at 13:15
  • I am facing another issue now. func1() performs some activity that takes around 1-2 seconds. To replicate that, I have set a `delay(1200)` in `func1()`. As I had mentioned, I am appending the list returned by func1() to my main list within async. After applying this delay, the list is not getting appended. Can you help me understand why this may be happening? – Saakshi Oct 04 '21 at 14:52
  • Please edit the question with the actual code of `func1`, or the delay you use to mimic it. Also, I'm not sure I understand what solution you picked eventually, but it feels like you're still using the shared mutable state approach we're trying to avoid by using `async`'s return value. Please share the code of the new solution (either in a new question, or an edit to this question. – Joffrey Oct 04 '21 at 15:20
  • @Joffrey I have edited the question. Please have a look – Saakshi Oct 05 '21 at 09:09
  • @Saakshi Please provide your implementation of `func1` as well (even your fake one calling `delay`). The only possibility for your list to contain `Unit`, is if that function is returning unit. – marstran Oct 06 '21 at 18:46
  • I was storing the value outside of coroutine scope, which was returning Unit (which holds "nothing"). On straight away using the value of `list`, i could capture the value i needed. Thank you for the solution – Saakshi Oct 14 '21 at 09:13
  • Be careful that some functions like `async` and `coroutineScope` return a value based on whatever is the last expression in the lambda, so if you add `println()` or logging at the end of such blocks you will get `Unit`. It's important to share exactly the code you're using so we can help you with this stuff. It would be interesting if you shared your final successful attempt (maybe with a playground link), because I have a feeling you're still manually copying stuff into another list, which is unnecessary – Joffrey Oct 14 '21 at 13:27
  • @Joffrey Just wondering about the performance aspect of this process of making a list by executing `someFunction` asynchronously. I have tested a simple operation like making a list of a particular size by calling for example `List(100) { 42 } by using coroutines(https://pl.kotl.in/ZA3HA8b1S) and also just a normal synchronous way(https://pl.kotl.in/kgyte4DsQ). It turns out that the latter is much faster. So Would the coroutine way be suitable for other complicated cases rather than what OP is trying to do? – roksui May 25 '22 at 05:23
  • @roksui 1) [microbenchmarks like this have no value](https://stackoverflow.com/q/504103/1540818), please use JMH to measure the correct thing. 2) making things concurrent has a non-zero cost (task management, thread synchronization if parallel, etc.). It is only worth parallelizing if what you're doing in parallel takes significant time. This is not only for coroutines, but in general. 3) Here you're using `runBlocking` so things are only concurrent but not parallel. Also, every operation is literally just reading a constant, so you'll have no benefit at all, and only additional cost. – Joffrey May 25 '22 at 11:34
4

If you just want to perform some tasks concurrently and get a list of all the finished results at the end, you could do this:

val jobs = (0..5).map { mainScope.async { func1() } }
val results = jobs.awaitAll()
marstran
  • 26,413
  • 5
  • 61
  • 67
  • 1
    SInce this requires to be in a `suspend` function anyway due to `awaitAll()`, I would suggest using `coroutineScope` instead of an external scope. – Joffrey Oct 02 '21 at 18:27
  • @Joffrey Good point. – marstran Oct 02 '21 at 19:01
  • Thank you. How do I parse "results"? Unable to read my custom object values that are returned by func1(). – Saakshi Oct 02 '21 at 19:29
  • @Saakshi what do you mean by "parse"? `results` is a list of 6 values, all of which are the results of repeated calls to `func1()` – Joffrey Oct 02 '21 at 19:31
  • @Joffrey results is a list of Kotlin.Unit objects since coroutines return Unit objects. I want to parse them/case them to my custom object but i am unable to. – Saakshi Oct 02 '21 at 19:44
  • @Saakshi So `func1` returns `Unit`? `Unit` contains no information. The list will be filled with whatever the `func1` function returns. – marstran Oct 02 '21 at 20:07
  • @Saakshi coroutines don't necessarily return Unit objects. Using `async` like in this answer allows you to get a result from `func1`. If `func1` returns something useful, you will get it. – Joffrey Oct 02 '21 at 22:31