21

I've got a Sequence (from File.walkTopDown) and I need to run a long-running operation on each of them. I'd like to use Kotlin best practices / coroutines, but I either get no parallelism, or way too much parallelism and hit a "too many open files" IO error.

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async { // I *think* I want async and not "launch"...
            ImageProcessor.fromFile(file)
        }
    }

This doesn't seem to run it in parallel, and my multi-core CPU never goes above 1 CPU's worth. Is there a way with coroutines to run "NumberOfCores parallel operations" worth of Deferred jobs?

I looked at Multithreading using Kotlin Coroutines which first creates ALL the jobs then joins them, but that means completing the Sequence/file tree walk completly bfore the heavy processing join step, and that seems... iffy! Splitting it into a collect and a process step means the collection could run way ahead of the processing.

val jobs = ... the Sequence above...
    .toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }
Sergio
  • 27,326
  • 8
  • 128
  • 149
Benjamin H
  • 5,164
  • 6
  • 34
  • 42

7 Answers7

13

This isn't specific to your problem, but it does answer the question of, "how to cap kotlin coroutines maximum concurrency".

EDIT: As of kotlinx.coroutines 1.6.0 (https://github.com/Kotlin/kotlinx.coroutines/issues/2919), you can use limitedParallelism, e.g. Dispatchers.IO.limitedParallelism(123).

Old solution: I thought to use newFixedThreadPoolContext at first, but 1) it's deprecated and 2) it would use threads and I don't think that's necessary or desirable (same with Executors.newFixedThreadPool().asCoroutineDispatcher()). This solution might have flaws I'm not aware of by using Semaphore, but it's very simple:

import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

/**
 * Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
 */
suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
    maxConcurrency: Int,
    transform: suspend (TInput) -> TOutput,
) = coroutineScope {
    val gate = Semaphore(maxConcurrency)
    this@mapConcurrently.map {
        async {
            gate.withPermit {
                transform(it)
            }
        }
    }.awaitAll()
}

Tests (apologies, it uses Spek, hamcrest, and kotlin test):

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestCoroutineDispatcher
import org.hamcrest.MatcherAssert.assertThat
import org.hamcrest.Matchers.greaterThanOrEqualTo
import org.hamcrest.Matchers.lessThanOrEqualTo
import org.spekframework.spek2.Spek
import org.spekframework.spek2.style.specification.describe
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
object AsyncHelpersKtTest : Spek({
    val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
    val testDispatcher = TestCoroutineDispatcher()

    afterEachTest {
        // Clean up the TestCoroutineDispatcher to make sure no other work is running.
        testDispatcher.cleanupTestCoroutines()
    }

    describe("mapConcurrently") {
        it("should run all inputs concurrently if maxConcurrency >= size") {
            val concurrentJobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = inputs.size

            // https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
            runBlocking(testDispatcher) {
                print("start runBlocking $coroutineContext\n")

                // We have to run this async so that the code afterwards can advance the virtual clock
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        val result = inputs.mapConcurrently(maxConcurrency) {
                            print("action $it $coroutineContext\n")

                            // Sanity check that we never run more in parallel than max
                            assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))

                            // Allow for virtual clock adjustment
                            delay(actionDelay)

                            // Sanity check that we never run more in parallel than max
                            assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                            print("action $it after delay $coroutineContext\n")

                            it
                        }

                        // Order is not guaranteed, thus a Set
                        assertEquals(inputs.toSet(), result.toSet())
                        print("end mapConcurrently $coroutineContext\n")
                    }
                }
                print("before advanceTime $coroutineContext\n")

                // Start the coroutines
                testDispatcher.advanceTimeBy(0)
                assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")

                testDispatcher.advanceTimeBy(actionDelay)
                print("after advanceTime $coroutineContext\n")
                assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
                job.join()
            }
        }

        it("should run one at a time if maxConcurrency = 1") {
            val concurrentJobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = 1

            runBlocking(testDispatcher) {
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        inputs.mapConcurrently(maxConcurrency) {
                            assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
                            delay(actionDelay)
                            assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                            it
                        }
                    }
                }

                testDispatcher.advanceTimeBy(0)
                assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")

                val elapsedTime = testDispatcher.advanceUntilIdle()
                print("elapsedTime=$elapsedTime")
                assertThat(
                    "Virtual time should be at least as long as if all jobs ran sequentially",
                    elapsedTime,
                    greaterThanOrEqualTo(actionDelay * inputs.size)
                )
                job.join()
            }
        }

        it("should handle cancellation") {
            val jobCounter = AtomicInteger(0)
            val inputs = IntRange(1, 2).toList()
            val maxConcurrency = 1

            runBlocking(testDispatcher) {
                val job = launch {
                    testDispatcher.pauseDispatcher {
                        inputs.mapConcurrently(maxConcurrency) {
                            jobCounter.addAndGet(1)
                            delay(actionDelay)
                            it
                        }
                    }
                }

                testDispatcher.advanceTimeBy(0)
                assertEquals(1, jobCounter.get(), "Only one job should have started")

                job.cancel()
                testDispatcher.advanceUntilIdle()
                assertEquals(1, jobCounter.get(), "Only one job should have run")
                job.join()
            }
        }
    }
})

Per https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing, you may also need to adjust compiler args for the tests to run:

compileTestKotlin {
    kotlinOptions {
        // Needed for runBlocking test coroutine dispatcher?
        freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
        freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
    }
}
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
Pat
  • 16,515
  • 15
  • 95
  • 114
  • Saved my day! Thanks for sharing, and especially big thanks for the tests. Got to learn quite a bit from there – Sam May 08 '21 at 08:08
  • 3
    Beware that suspended jobs do not count against the `limitedParallelism`, so if you're working with non-blocking (suspending) APIs then `limitedParallelism` will not limit the concurrency of your operation. The key word in this function is parallelism, it does not limit concurrency. `limitedParallelism` is most likely to be useful along with `Dispatchers.IO`, which is designed for blocking IO tasks. – morsecoder Feb 22 '23 at 19:21
7

The problem with your first snippet is that it doesn't run at all - remember, Sequence is lazy, and you have to use a terminal operation such as toSet() or forEach(). Additionally, you need to limit the number of threads that can be used for that task via constructing a newFixedThreadPoolContext context and using it in async:

val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel")

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async(pictureContext) {
            ImageProcessor.fromFile(file)
        }
    }
    .toList()
    .forEach { it.await() }

Edit: You have to use a terminal operator (toList) befor awaiting the results

voddan
  • 31,956
  • 8
  • 77
  • 87
  • I though that would work, but it still seems to process the final forEach sequentially. eg. .map { file -> async(CommonPool) { println("start") val img = ImageFile.fromFile(file) println("end") img } } .forEach { imageFiles.add(it.await()) if (Math.random() > 0.999) { imageFiles.save() } } – Benjamin H Dec 08 '17 at 04:33
  • Oh, snap, you are right. Now I think there is no way to do it with Sequences. Edited the answer – voddan Dec 08 '17 at 06:01
  • 5
    It's worth noting that using a limited thread pool limits parallelism but not concurrency meaning that if `ImageProcessor.fromFile` is a suspending function (that doesn't block) you can still process multiple files at ones which is maybe not what you want. – Nicklas A. Feb 07 '19 at 16:11
5

I got it working with a Channel. But maybe I'm being redundant with your way?

val pipe = ArrayChannel<Deferred<ImageFile>>(20)
launch {
    while (!(pipe.isEmpty && pipe.isClosedForSend)) {
        imageFiles.add(pipe.receive().await())
    }
    println("pipe closed")
}
File("/Users/me/").walkTopDown()
        .onFail { file, ex -> println("ERROR: $file caused $ex") }
        .forEach { pipe.send(async { ImageFile.fromFile(it) }) }
pipe.close()
Benjamin H
  • 5,164
  • 6
  • 34
  • 42
5

To limit the parallelism to some value there is limitedParallelism function starting from the 1.6.0 version of the kotlinx.coroutines library. It can be called on CoroutineDispatcher object. So to limit threads for parallel execution we can write something like:

val parallelismLimit = Runtime.getRuntime().availableProcessors()
val limitedDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit)
val scope = CoroutineScope(limitedDispatcher) // we can set limitedDispatcher for the whole scope

scope.launch { // or we can set limitedDispatcher for a coroutine launch(limitedDispatcher)
    File("/Users/me/Pictures/").walkTopDown()
        .onFail { file, ex -> println("ERROR: $file caused $ex") }
        .filter { ... only big images... }
        .map { file ->
            async {
                ImageProcessor.fromFile(file)
            }
        }.toList().awaitAll()
}

ImageProcessor.fromFile(file) will be executed in parallel using parallelismLimit number of threads.

Sergio
  • 27,326
  • 8
  • 128
  • 149
4

This doesn't preserve the order of the projection but otherwise limits the throughput to at most maxDegreeOfParallelism. Expand and extend as you see fit.

suspend fun <TInput, TOutput> (Collection<TInput>).inParallel(
        maxDegreeOfParallelism: Int,
        action: suspend CoroutineScope.(input: TInput) -> TOutput
): Iterable<TOutput> = coroutineScope {

    val list = this@inParallel

    if (list.isEmpty())
        return@coroutineScope listOf<TOutput>()

    val brake = Channel<Unit>(maxDegreeOfParallelism)
    val output = Channel<TOutput>()
    val counter = AtomicInteger(0)

    this.launch {

        repeat(maxDegreeOfParallelism) {
            brake.send(Unit)
        }

        for (input in list) {

            val task = this.async {
                action(input)
            }

            this.launch {
                val result = task.await()
                output.send(result)
                val completed = counter.incrementAndGet()
                if (completed == list.size) {
                    output.close()
                } else brake.send(Unit)
            }

            brake.receive()
        }
    }

    val results = mutableListOf<TOutput>()
    for (item in output) {
        results.add(item)
    }

    return@coroutineScope results
}

Example usage:

val output = listOf(1, 2, 3).inParallel(2) {
    it + 1
} // Note that output may not be in same order as list.
Gleno
  • 16,621
  • 12
  • 64
  • 85
  • This works fairly well, and has the added benefit that the elements are initiated in the same order as the original collection, which the semaphore solution does not provide. However, there seems to be an off-by-one bug in this solution, since it allows one more unit of parallelism than specified. Moving the `brake.receive` to the top of the `for (input in list)` fixes the issue. – morsecoder Feb 22 '23 at 14:35
3

Why not use the asFlow() operator and then use flatMapMerge?

someCoroutineScope.launch(Dispatchers.Default) {
    File("/Users/me/Pictures/").walkTopDown()
        .asFlow()
        .filter { ... only big images... }
        .flatMapMerge(concurrencyLimit) { file ->
            flow {
                emit(runInterruptable { ImageProcessor.fromFile(file) })
            }
        }.catch { ... }
        .collect()
    }

Then you can limit the simultaneous open files while still processing them concurrently.

Greg
  • 61
  • 4
  • 2
    Nice! I didn't think that would work because I read `controls the number of in-flight flows` as "how many flows it could merge" (in my case, I'm working off of just one), but you make me now think it might mean "how many emits it can be chewing on at once" – Benjamin H Dec 17 '21 at 20:10
2

This will cap coroutines to workers. I'd recommend watching https://www.youtube.com/watch?v=3WGM-_MnPQA

package com.example.workers

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlin.system.measureTimeMillis

class ChannellibgradleApplication

fun main(args: Array<String>) {
    var myList = mutableListOf<Int>(3000,1200,1400,3000,1200,1400,3000)
    runBlocking {
        var myChannel = produce(CoroutineName("MyInts")) {
            myList.forEach { send(it) }
        }

        println("Starting coroutineScope  ")
        var time = measureTimeMillis {
            coroutineScope {
                var workers = 2
                repeat(workers)
                {
                    launch(CoroutineName("Sleep 1")) { theHardWork(myChannel) }
                }
            }
        }
        println("Ending coroutineScope  $time ms")
    }
}

suspend fun theHardWork(channel : ReceiveChannel<Int>) 
{
    for(m in channel) {
        println("Starting Sleep $m")
        delay(m.toLong())
        println("Ending Sleep $m")
    }
}