0

I need to launch a number of jobs which will return a result.

In the main code (which is not a coroutine), after launching the jobs I need to wait for them all to complete their task OR for a given timeout to expire, whichever comes first.

If I exit from the wait because all the jobs completed before the timeout, that's great, I will collect their results.

But if some of the jobs are taking longer that the timeout, my main function needs to wake as soon as the timeout expires, inspect which jobs did finish in time (if any) and which ones are still running, and work from there, without cancelling the jobs that are still running.

How would you code this kind of wait?

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
Tobia
  • 17,856
  • 6
  • 74
  • 93

3 Answers3

2

The solution follows directly from the question. First, we'll design a suspending function for the task. Let's see our requirements:

if some of the jobs are taking longer that the timeout... without cancelling the jobs that are still running.

It means that the jobs we launch have to be standalone (not children), so we'll opt-out of structured concurrency and use GlobalScope to launch them, manually collecting all the jobs. We use async coroutine builder because we plan to collect their results of some type R later:

val jobs: List<Deferred<R>> = List(numberOfJobs) { 
    GlobalScope.async { /* our code that produces R */ }
}

after launching the jobs I need to wait for them all to complete their task OR for a given timeout to expire, whichever comes first.

Let's wait for all of them and do this waiting with timeout:

withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }

We use joinAll (as opposed to awaitAll) to avoid exception if one of the jobs fail and withTimeoutOrNull to avoid exception on timeout.

my main function needs to wake as soon as the timeout expires, inspect which jobs did finish in time (if any) and which ones are still running

jobs.map { deferred -> /* ... inspect results */ }

In the main code (which is not a coroutine) ...

Since our main code is not a coroutine it has to wait in a blocking way, so we bridge the code we wrote using runBlocking. Putting it all together:

fun awaitResultsWithTimeoutBlocking(
    timeoutMillis: Long,
    numberOfJobs: Int
) = runBlocking {
    val jobs: List<Deferred<R>> = List(numberOfJobs) { 
        GlobalScope.async { /* our code that produces R */ }
    }    
    withTimeoutOrNull(timeoutMillis) { jobs.joinAll() }
    jobs.map { deferred -> /* ... inspect results */ }
}

P.S. I would not recommend deploying this kind of solution in any kind of a serious production environment, since letting your background jobs running (leak) after timeout will invariably badly bite you later on. Do so only if you throughly understand all the deficiencies and risks of such an approach.

Roman Elizarov
  • 27,053
  • 12
  • 64
  • 60
1

You can try to work with whileSelect and the onTimeout clause. But you still have to overcome the problem that your main code is not a coroutine. The next lines are an example of whileSelect statement. The function returns a Deferred with a list of results evaluated in the timeout period and another list of Deferreds of the unfinished results.

fun CoroutineScope.runWithTimeout(timeoutMs: Int): Deferred<Pair<List<Int>, List<Deferred<Int>>>> = async {

    val deferredList = (1..100).mapTo(mutableListOf()) {
        async {
            val random = Random.nextInt(0, 100)
            delay(random.toLong())
            random
        }
    }

    val finished = mutableListOf<Int>()
    val endTime = System.currentTimeMillis() + timeoutMs

    whileSelect {
        var waitTime = endTime - System.currentTimeMillis()
        onTimeout(waitTime) {
            false
        }
        deferredList.toList().forEach { deferred ->
            deferred.onAwait { random ->
                deferredList.remove(deferred)
                finished.add(random)
                true
            }
        }
    }

    finished.toList() to deferredList.toList()
}

In your main code you can use the discouraged method runBlocking to access the Deferrred.

fun main() = runBlocking<Unit> {
    val deferredResult = runWithTimeout(75)
    val (finished, pending) = deferredResult.await()
    println("Finished: ${finished.size} vs Pending: ${pending.size}")
}
Rene
  • 5,730
  • 17
  • 20
  • Thank you. But how does your code behave in edge cases? When `deferredList` is empty, for example, or when all jobs terminate before the timeout? – Tobia Mar 11 '19 at 14:03
  • Also, why is `runBlocking` discouraged and where did you read it? – Tobia Mar 11 '19 at 14:26
0

Here is the solution I came up with. Pairing each job with a state (among other info):

private enum class State { WAIT, DONE, ... }

private data class MyJob(
    val job: Deferred<...>,
    var state: State = State.WAIT,
    ...
)

and writing an explicit loop:

// wait until either all jobs complete, or a timeout is reached
val waitJob = launch { delay(TIMEOUT_MS) }
while (waitJob.isActive && myJobs.any { it.state == State.WAIT }) {
    select<Unit> {
        waitJob.onJoin {}
        myJobs.filter { it.state == State.WAIT }.forEach { 
            it.job.onJoin {}
        }
    }
    // mark any finished jobs as DONE to exclude them from the next loop
    myJobs.filter { !it.job.isActive }.forEach { 
        it.state = State.DONE
    }
}

The initial state is called WAIT (instead of RUN) because it doesn't necessarily mean that the job is still running, only that my loop has not yet taken it into account.

I'm interested to know if this is idiomatic enough, or if there are better ways to code this kind of behaviour.

Tobia
  • 17,856
  • 6
  • 74
  • 93