53

In Scala, one can easily do a parallel map, forEach, etc, with:

collection.par.map(..)

Is there an equivalent in Kotlin?

HRJ
  • 17,079
  • 11
  • 56
  • 80
  • 4
    Some of the fastest parallel collections around are from GS-Collections: https://github.com/goldmansachs/gs-collections ... which you can use from Kotlin (as any Java collection framework can be used) – Jayson Minard Jan 11 '16 at 12:20

11 Answers11

66

The Kotlin standard library has no support for parallel operations. However, since Kotlin uses the standard Java collection classes, you can use the Java 8 stream API to perform parallel operations on Kotlin collections as well.

e.g.

myCollection.parallelStream()
        .map { ... }
        .filter { ... }
Graham Russell
  • 997
  • 13
  • 24
yole
  • 92,896
  • 20
  • 260
  • 197
  • 2
    how can one use Java 8 stream API in Kotlin? – LordScone Feb 20 '16 at 14:36
  • 2
    @LordScone The same way as you'd do it in Java. E.g.: `myCollection.parallelStream().map { ... }. filter { ... }` – Grzegorz D. Sep 11 '17 at 12:26
  • just for information, parallelStream is only advised in case of long lists, for smaller lists, it will be an overhead to run the 2 jobs in different threads as in java world. Stick to coroutines as far as possible – MozenRath Jan 13 '21 at 17:49
  • parallelStream is Java, not kotlin, that's why it requires Java 8 – Javier Delgado Sep 09 '21 at 08:44
  • 1
    @MozenRath what if we have lists not long, but items in list may take a long time to finish. Would you advise parallelStream ? – sherelock Nov 27 '22 at 16:20
45

As of Kotlin 1.1, parallel operations can also be expressed quite elegantly in terms of coroutines. Here is a custom pmap helper function for lists:

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(Dispatchers.Default) { f(it) } }.map { it.await() }
}
emragins
  • 4,607
  • 2
  • 33
  • 48
Alex Krauss
  • 9,438
  • 4
  • 27
  • 31
  • 4
    With Kotlin 1.3 out, is this still the best answer? I noticed @OlivierTerrien's Stream answer below, but I'd prefer to stick with Kotlin Sequences and Iterables. – Benjamin H Nov 01 '18 at 19:57
  • @BenjaminH Thanks; I have marked yole's answer as accepted, as it also refers to the stream API and posted before OlivierTerrien's answer. – HRJ Dec 31 '18 at 03:47
  • 57
    Quite elegantly? On the contrary, the code is pretty hard to read I would say. – Dzmitry Lazerka Feb 20 '19 at 22:29
  • 6
    @DzmitryLazerka I think I see where you're coming from, but this exact code isn't the elegant bit. The use of this code is what's elegant. If the above method is placed somewhere, it can be used with just `foo.pmap { v -> ... }`. I think that's fairly elegant. – Joseph Catrambone Jun 19 '19 at 21:16
  • 1
    Currently 'CommonPool' cannot be accessed - it is internal in 'kotlinx.coroutines'! – Strinder Jun 09 '20 at 19:41
  • This is in fact the best performing solution i think. It does not have any overheads like a parallelStream. Although would have loved to see a helper method from kotlin that could cater to it. – MozenRath Jan 13 '21 at 17:52
  • @Strinder You can use Dispatchers.Default instead of CommonPool, see https://discuss.kotlinlang.org/t/commonpool-default-for-coroutines/11965/3 and https://github.com/Kotlin/kotlinx.coroutines/pull/633 – BdN3504 Feb 10 '22 at 11:03
  • `.map { it.await() }` can now be replaced with `.awaitAll()`. – Tenfour04 Jul 20 '22 at 13:22
24

You can use this extension method:

suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
    map { async { f(it) } }.awaitAll()
}

See Parallel Map in Kotlin for more info

Sharon
  • 508
  • 4
  • 11
17

There is no official support in Kotlin's stdlib yet, but you could define an extension function to mimic par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(github source)

Here's a simple usage example

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

If needed it allows to tweak threading by providing the number of threads or even a specific java.util.concurrent.Executor. E.g.

listOf("foo", "bar").pmap(4, transform = { it + "!" })

Please note, that this approach just allows to parallelize the map operation and does not affect any downstream bits. E.g. the filter in the first example would run single-threaded. However, in many cases just the data transformation (ie. map) requires parallelization. Furthermore, it would be straightforward to extend the approach from above to other elements of Kotlin collection API.

Holger Brandl
  • 10,634
  • 3
  • 64
  • 63
  • I don't see how "destination.add(transform(item))" is thread safe. What's to keep two threads from calling "destination.add" at the same time, thus breaking stuff since ArrayList.add() is not a thread safe operation? – David Leppik Mar 22 '16 at 20:55
  • Thanks for the hint. Quite [some](http://stackoverflow.com/questions/2715983/concurrent-threads-adding-to-arraylist-at-same-time-what-happens) people think that when just adding elements it should be fine without synchronization. However, I've changed it to use a synchronized list to improve thread-safety. – Holger Brandl Mar 23 '16 at 10:25
  • 1
    The order in destination may not be the same as in the original list – Cuper Hector Mar 24 '16 at 12:08
  • 1
    I think many parallel collection implementations (like in [scala](http://beust.com/weblog/2011/08/15/scalas-parallel-collections/)) do not care about preserving order. Though, by changing the for-each loop above to an indexed loop along with downstream resorting, order could be preserved easily. – Holger Brandl Mar 29 '16 at 18:36
  • I'm interested in a version that returns a `Sequence` (or `Flow`). Unfortunately I can't simply have the whole code execute in an `= execute{` block and call `yield` instead of `destination.add` because `yield` can only execute in the original block, so within `exec.submit { }` is not an option. (Order need not be preserved.) – StephanS Nov 02 '19 at 02:44
  • I suspect this was an amazing answer in 2016 when it was provided, but now that Structured Concurrency exists in Kotlin we should instead be using `coroutineScope` with `async { ... }` as indicated in @Sharon 's [answer](https://stackoverflow.com/a/58735169/1672027) – Matt Klein Nov 21 '22 at 15:23
13

From 1.2 version, kotlin added a stream feature which is compliant with JRE8

So, iterating over a list asynchronously could be done like bellow:

fun main(args: Array<String>) {
  val c = listOf("toto", "tata", "tutu")
  c.parallelStream().forEach { println(it) }
}
OlivierTerrien
  • 2,451
  • 1
  • 19
  • 31
  • I am not following kotlin very closely; isn't this the same as Yole's answer?I appreciate that your answer has sample code. Maybe we can edit Yole's answer to add the sample code. – HRJ Dec 31 '18 at 03:49
  • @HRJ, not exactly. Yole said Kotlin had no support for stream which is true until version 1.2. Since this version, Kotlin provides a way to stream collections as Java8 does. – OlivierTerrien Jan 01 '19 at 12:13
  • Yole said "Kotlin has no support for parallel operations". Please check again. – HRJ Jan 01 '19 at 13:48
  • Yes you are right. Too quickly written. Parallel operations not stream. – OlivierTerrien Jan 01 '19 at 13:53
  • Probably worth pointing out that as this requires JRE8 it is only available on Android 24 and above. – Rupert Rawnsley May 28 '20 at 14:42
  • Nothing in this answer is Kotlin per se, it is calling Java standard library. Which is not bad, just not Kotlin specific. – Michael Piefel Oct 30 '22 at 17:47
6

Kotlin wants to be idiomatic but not too much synthetic to be hard to understand at a first glance.

Parallel computation trough Coroutines is no exception. They want it to be easy but not implicit with some pre-built method, allowing to branch the computation when needed.

In your case:

collection.map { 
        async{ produceWith(it) } 
    }
    .forEach { 
        consume(it.await()) 
    }

Notice that to call async and await you need to be inside a so called Context, you cannot make suspending calls or launching a coroutine from a non-coroutine context. To enter one you can either:

  • runBlocking { /* your code here */ }: it will suspend the current thread until the lambda returns.
  • GlobalScope.launch { }: it will execute the lambda in parallel; if your main finishes executing while your coroutines have not bad things will happen, in that case better use runBlocking.

Hope it may helps :)

Lamberto Basti
  • 478
  • 1
  • 6
  • 24
4

At the present moment no. The official Kotlin comparison to Scala mentions:

Things that may be added to Kotlin later:

  • Parallel collections
Bhargav Rao
  • 50,140
  • 28
  • 121
  • 140
Martin Konecny
  • 57,827
  • 19
  • 139
  • 159
4

This solution assumes that your project is using coroutines:

implementation( "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2")

The functions called parallelTransform don't retain the order of elements and return a Flow<R>, while the function parallelMap retains the order and returns a List<R>.

Create a threadpool for multiple invocations:

val numberOfCores = Runtime.getRuntime().availableProcessors()
val executorDispatcher: ExecutorCoroutineDispatcher =
    Executors.newFixedThreadPool(numberOfCores ).asCoroutineDispatcher()

use that dispatcher (and call close() when it's no longer needed):

inline fun <T, R> Iterable<T>.parallelTransform(
    dispatcher: ExecutorDispatcher,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    launch(dispatcher) {
        items.forEach {item ->
            launch {
                channelFlowScope.send(transform(item))
            }
        }
    }
}

If threadpool reuse is of no concern (threadpools aren't cheap), you can use this version:

inline fun <T, R> Iterable<T>.parallelTransform(
    numberOfThreads: Int,
    crossinline transform: (T) -> R
): Flow<R> = channelFlow {

    val items: Iterable<T> = this@parallelTransform
    val channelFlowScope: ProducerScope<R> = this@channelFlow

    Executors.newFixedThreadPool(numberOfThreads).asCoroutineDispatcher().use { dispatcher ->
        launch( dispatcher ) {
            items.forEach { item ->
                launch {
                    channelFlowScope.send(transform(item))
                }
            }
        }
    }
}

if you need a version that retains the order of elements:

inline fun <T, R> Iterable<T>.parallelMap(
    dispatcher: ExecutorDispatcher,
    crossinline transform: (T) -> R
): List<R> = runBlocking {

    val items: Iterable<T> = this@parallelMap
    val result = ConcurrentSkipListMap<Int, R>()

    launch(dispatcher) {
        items.withIndex().forEach {(index, item) ->
            launch {
                result[index] = transform(item)
            }
        }
    }

    // ConcurrentSkipListMap is a SortedMap
    // so the values will be in the right order
    result.values.toList()
}
StephanS
  • 843
  • 1
  • 8
  • 20
2

I found this:

implementation 'com.github.cvb941:kotlin-parallel-operations:1.3'

details:

https://github.com/cvb941/kotlin-parallel-operations

Milan Jurkulak
  • 557
  • 3
  • 6
2

I've come up with a couple of extension functions:

  1. The suspend extension function on Iterable<T> type, which does a parallel processing of items and returns some result of processing each item. By default it uses Dispatchers.IO dispatcher to offload blocking tasks to a shared pool of threads. Must be called from a coroutine (including a coroutine with Dispatchers.Main dispatcher) or another suspend function.

    suspend fun <T, R> Iterable<T>.processInParallel(
        dispatcher: CoroutineDispatcher = Dispatchers.IO,
        processBlock: suspend (v: T) -> R,
    ): List<R> = coroutineScope { // or supervisorScope
        map {
            async(dispatcher) { processBlock(it) }
        }.awaitAll()
    }
    

    Example of calling from a coroutine:

    val collection = listOf("A", "B", "C", "D", "E")
    
    someCoroutineScope.launch {
        val results = collection.processInParallel {
            process(it)
        }
        // use processing results
    }
    

where someCoroutineScope is an instance of CoroutineScope.

  1. Launch and forget extension function on CoroutineScope, which doesn't return any result. It also uses Dispatchers.IO dispatcher by default. Can be called using CoroutineScope or from another coroutine.

    fun <T> CoroutineScope.processInParallelAndForget(
        iterable: Iterable<T>,
        dispatcher: CoroutineDispatcher = Dispatchers.IO,
        processBlock: suspend (v: T) -> Unit
    ) = iterable.forEach {
        launch(dispatcher) { processBlock(it) }
    }
    

    Example of calling:

    someoroutineScope.processInParallelAndForget(collection) {
        process(it)
    }
    
    // OR from another coroutine:
    
    someCoroutineScope.launch {
        processInParallelAndForget(collection) {
            process(it)
        }
    }
    

2a. Launch and forget extension function on Iterable<T>. It's almost the same as previous, but the extension type is different. CoroutineScope must be passed as argument to the function.

fun <T> Iterable<T>.processInParallelAndForget(
    scope: CoroutineScope,
    dispatcher: CoroutineDispatcher = Dispatchers.IO,
    processBlock: suspend (v: T) -> Unit
) = forEach {
    scope.launch(dispatcher) { processBlock(it) }
}

Calling:

collection.processInParallelAndForget(someCoroutineScope) {
    process(it)
}

// OR from another coroutine:

someScope.launch {
    collection.processInParallelAndForget(this) {
        process(it)
    }
}
Sergio
  • 27,326
  • 8
  • 128
  • 149
2

You can mimic the Scala API by using extension properties and inline classes. Using the coroutine solution from @Sharon answer, you can write it like this

val <A> Iterable<A>.par get() = ParallelizedIterable(this)

@JvmInline
value class ParallelizedIterable<A>(val iter: Iterable<A>) {
    suspend fun <B> map(f: suspend (A) -> B): List<B> = coroutineScope {
        iter.map { async { f(it) } }.awaitAll()
    }
}

with this, now your code can change from

anIterable.map { it.value } 

to

anIterable.par.map { it.value } 

also you can change the entry point as you like other than using extension properties, e.g.

fun <A> Iterable<A>.parallel() = ParallelizedIterable(this)

anIterable.parallel().map { it.value } 

You can also use another parallel solution and implement the rest of iterable methods inside ParallelizedIterable while still having the same method names for the operations

The drawback is that this implementation can only parallelize one operation after it, to make it so that it parallelize every subsequent operation, you may need to modify ParallelizedIterable further so it return its own type instead of returning back to List<A>

keychera
  • 71
  • 1
  • 5