2

I'm trying to implement a parallel implementation for both Iterable and Sequence in Kotlin. I got a little file, it consists of 4 extension functions, but the third one gives me an compiler error:

suspend fun <T, R> Iterable<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

suspend fun <T> Iterable<T>.parallelForEach(block: suspend (T) -> Unit) =
    coroutineScope { map { async { block(it) } }.forEach { it.await() } }

suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

suspend fun <T> Sequence<T>.parallelForEach(block: suspend (T) -> Unit) =
    coroutineScope { map { async { block(it) } }.forEach { it.await() } }

The compiler comes back and says that suspension functions can only be called inside suspension functions. Is there a way to implement this?

Edit: fixed bad copy/paste

Edit2: I thought of an implementation:

suspend fun <T, R> Sequence<T>.parrallelMap(block: suspend (T) -> R) =
        asIterable().map { coroutineScope { async { block(it) } } }
              .asSequence().map { runBlocking { it.await() } }

I was hoping that this would fire all the suspending functions and await them lazily. I'm just not sure if this is safe, or this saves time or not.

Typhaon
  • 828
  • 8
  • 27

1 Answers1

4

There is a problem with the core semantics of parallel execution for lazy sequences. Your current implementation does not start block(it) until the resulting sequence is iterated:

suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
    coroutineScope { map { async { block(it) } }.map { it.await() } }

Consider the following example:

sequenceOf(1, 2, 3).parallelMap { it * it }.forEach { println(it) }

For this sample the order of execution will be

val p1 = async { 1 * 1 } 
val r1 = p1.await()
println(r1)
val p2 = async { 2 * 2 } 
val r2 = p2.await()
println(r2)
val p3 = async { 3 * 3 } 
val r3 = p3.await()
println(r3)

Note that the execution of the mapping operations is sequientional, not parallel.

What the compiler tells you is that the lambda of Sequence<T>.map {} is performed lazily on-demand outside of the context of the call (read: outside of your coroutine), so you can't use the coroutine you are currently in.

Frankly, I am not sure how one can both perform lazy computation and do it in parallel.

voddan
  • 31,956
  • 8
  • 77
  • 87
  • Fair point. That might be a problem. What if I want to fire all the async blocks, but want to await them lazily. would something like `suspend fun Sequence.parrallelMap(block: suspend (T) -> R) = asIterable().mapAsync(block).asSequence().map { runBlocking { it.await() }}` work you think? Edit: mapAsync is `suspend fun Iterable.mapAsync(block: suspend(T) -> R) = coroutineScope { map { async { block(it) } } }` – Typhaon Apr 24 '19 at 13:51
  • 1
    Thinking about this. Does this also mean that the parallelForEach doesn't do anything parallel at all? – Typhaon Apr 24 '19 at 14:27
  • Ye, `parallelForEach` is exactly the same as `parrallelMap` here – voddan Apr 25 '19 at 10:39