18

I`m new to Kotlin coroutines and one thing I failed to figure out is, how does the coroutines know when to yield to others when making network calls.

If I understand it right, a coroutine works preemptively, which means it knows when to yield to other coroutines when it has some time-consuming tasks(typically I/O operations) to perform.

For example, let`s say we want to paint some UI which will display data from a remote server, and we have only one thread to schedule our coroutines. We could launch one coroutine to make REST API calls to get the data, while having another coroutine paint the rest of the UI which have no dependency on the data. However, since we have only one thread, there could only be one coroutine running at a time. And unless the coroutine which is used to fetch data preemptively yields while it is waiting for data to arrive, the two coroutines would be executed sequentially.

As far as I know, Kotlin's coroutine implementation does not patch any of existing JVM implementation or JDK network libraries. So if a coroutine is calling a REST API, it should block just like this is done using a Java thread. I'm saying this because I've seem similar concepts in python which are called green threads. And in order for it to work with python`s built-in network library, one must 'monkey-patch' the network library first. And to me this makes sense because only the network library itself knows when to yield.

So could anyone explain how Kotlin coroutine knows when to yield when calling blocking Java network APIs? Or if it does not, then does it mean the tasks mentioned in the example above could not be performed concurrently give a single thread?

Thanks!

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
Rafoul
  • 185
  • 1
  • 8
  • Kotlin is using non-blocking io for network operations. Also no one prevents libraries from creating as many threads as needed. Wiki it: Non-blocking I/O (Java) – Alexander Anikin Sep 19 '18 at 09:26

2 Answers2

10

a coroutine works preemptively

Nope. With coroutines you can only implement cooperative multithreading, where you suspend and resume coroutines with explicit method calls. The coroutine singles out just the concern of suspending and resuming on demand, whereas the coroutine dispatcher is in charge of ensuring it starts and resumes on the appropriate thread.

Studying this code will help you see the essence of Kotlin coroutines:

import kotlinx.coroutines.experimental.*
import kotlin.coroutines.experimental.*

fun main(args: Array<String>) {
    var continuation: Continuation<Unit>? = null
    println("main(): launch")
    GlobalScope.launch(Dispatchers.Unconfined) {
        println("Coroutine: started")
        suspendCoroutine<Unit> {
            println("Coroutine: suspended")
            continuation = it
        }
        println("Coroutine: resumed")
    }
    println("main(): resume continuation")
    continuation!!.resume(Unit)
    println("main(): back after resume")
}

Here we use the most trivial Unconfined dispatcher, which doesn't do any dispatching, it runs the coroutine right there where you call launch { ... } and continuation.resume(). The coroutine suspends itself by calling suspendCoroutine. This function runs the block you supply by passing it the object you can use later to resume the coroutine. Our code saves it to the var continuation. Control returns to the code after launch, where we use the continuation object to resume the coroutine.

The entire program executes on the main thread and prints this:

main(): launch
Coroutine: started
Coroutine: suspended
main(): resume continuation
Coroutine: resumed
main(): back after resume

We could launch one coroutine to make REST API calls to get the data, while having another coroutine paint the rest of the UI which have no dependency on the data.

This actually describes what you'd do with plain threads. The advantage of coroutines is that you can make a "blocking" call in the middle of GUI-bound code and it won't freeze the GUI. In your example you'd write a single coroutine that makes the network call and then updates the GUI. While the network request is in progress, the coroutine is suspended and other event handlers run, keeping the GUI live. The handlers aren't coroutines, they are just regular GUI callbacks.

In the simplest terms, you can write this Android code:

activity.launch(Dispatchers.Main) {
    textView.text = requestStringFromNetwork()
}

...

suspend fun requestStringFromNetwork() = suspendCancellableCoroutine<String> {
    ...
}

requestStringFromNetwork is the equivalent of "patching the IO layer", but you don't actually patch anything, you just write wrappers around the IO library's public API. Pretty much all Kotlin IO libraries are adding these wrappers and there are extension libs for Java IO libs as well. It's also very straightforward to write your own if you follow these instructions.

Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436
  • Thank you for clarifying. Your answer made me rethink my question. I also found this link helpful [link](https://stackoverflow.com/questions/22177722/java-nio-non-blocking-channels-vs-asynchronouschannels). – Rafoul Sep 20 '18 at 07:40
  • 5
    So here is my understanding now, 1. Using Kotlin coroutines does not automatically turns blocking calls into non-blocking ones, it just provides an easier and more natural way for writing asynchronous code(by providing a mechanism for suspend and resume coroutines). 2. To get the most out of it, we'd better use some kind of async libraries, such as Java NIO, or CompletableFuture which has callbacks. 3. On the other hand, if blocking calls are used, we`d still get blocked unless we use more threads. @Marko, could you please verify whether my new understanding is correct or not? Thanks! – Rafoul Sep 20 '18 at 07:40
  • 4
    Yes, pretty much on the target. Blocking calls always end up in `native` methods, there's no way you can patch them from the outside to turn them into non-blocking ones. Python is very different here because its IO has always been based on non-blocking IO and the IO event loop. That's something you _can_ patch to get coroutine-like behavior. – Marko Topolnik Sep 20 '18 at 09:47
  • However, I wouldn't recommend you to get involved with Java NIO directly, it's very low-level and quite awkward APIs. You should use a library like Netty that turns it into something more convenient. – Marko Topolnik Sep 20 '18 at 09:52
  • thanks. I read your first answer again and now I understand it much better. Actually I believe it answered my question in the first place, just that I failed to get it at that time. – Rafoul Sep 20 '18 at 13:47
4

The answer is: Coroutine does not know about network calls or I/O operations. You have to write the code according to what you want, enclosing heavy work into different coroutines, so they can be executed concurrently, because the default behavior is sequentially.

For example:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here (maybe I/O)
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here (maybe I/O), too
    return 29
}

fun main(args: Array<String>) = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = doSomethingUsefulOne()
            val two = doSomethingUsefulTwo()
            println("The answer is ${one + two}")
        }
    println("Completed in $time ms")
}

will produce something like this:

The answer is 42
Completed in 2017 ms

and doSomethingUsefulOne() and doSomethingUsefulTwo() will be executed sequentially. If you want concurrent execution you must write instead:

fun main(args: Array<String>) = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

that will produce:

The answer is 42
Completed in 1017 ms

as doSomethingUsefulOne() and doSomethingUsefulTwo() will be executed concurrently.

Source: https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#composing-suspending-functions

UPDATE: About where the coroutines are executed we can read in the github project guide https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#thread-local-data:

Sometimes it is convenient to have an ability to pass some thread-local data, but, for coroutines, which are not bound to any particular thread, it is hard to achieve it manually without writing a lot of boilerplate.

For ThreadLocal, asContextElement extension function is here for the rescue. It creates an additional context element, which keep the value of the given ThreadLocal and restores it every time the coroutine switches its context.

It is easy to demonstrate it in action:

val threadLocal = ThreadLocal<String?>() // declare thread-local variable
fun main(args: Array<String>) = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, threadlocal value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}

In this example we launch new coroutine in a background thread pool using Dispatchers.Default, so it works on a different threads from a thread pool, but it still has the value of thread local variable, that we've specified using threadLocal.asContextElement(value = "launch"), no matter on what thread the coroutine is executed. Thus, output (with debug) is:

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[CommonPool-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[CommonPool-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Raymond Arteaga
  • 4,355
  • 19
  • 35
  • 1
    Hi Raymond, thank you for your reply. Could you please elaborate on the differences between the examples you gave above. I checked the document and it seems to me that the 'await' call means waiting for the coroutine to complete without blocking the current thread(I did not find any doc for 'bg' through). However, as I put it in the question, how is it possible for two coroutines to run concurrently if they do not know when to yield, without introducing multiple threads? – Rafoul Sep 19 '18 at 08:44
  • Specifically in your example, even though the 'downloadBigFileUsingNetwork' is explicitly executed as a coroutine, I would expect it to continue consume the current thread unless it explicitly yield to the parent coroutine, which is responsible for rendering the UI. Ideally, it should yield when it is waiting for the download to complete. But as you mentioned, it doesn't know it is performing I/O operations, which means it will not yield automatically. Under such a case, I suppose it still blocks the current thread, isn`t it? – Rafoul Sep 19 '18 at 08:47
  • Your question can be simplified to the following: does `downloadBigFileUsingNetwork` use blocking on non-blocking network calls? All Android network libraries are async, so you just have to use their callbacks to resume the coroutine. If you somehow still use a blocking API, then you must use `withContext(Default) { blockingCall() }`, which turns the blocking call into a suspending one. – Marko Topolnik Sep 19 '18 at 16:28
  • Nothing in your answer demonstrates the suspension of coroutines. Your code has exactly the same form and behavior as if all calls were blocking. You can implement your `async { task() }` with something like `val future = threadPool.submit { task() }` and `task.await()` with `future.get()` – Marko Topolnik Sep 20 '18 at 06:15
  • @MarkoTopolnik sorry, maybe I'm misunderstanding the point of this question, interlingual barriers can be hard to overcome. – Raymond Arteaga Sep 20 '18 at 13:35
  • I was commenting the point of your answer, though. The code you wrote doesn't demonstrate anything interesting about coroutines. It doesn't show suspension/resumption or how some other code executes on the same thread while a coroutine is suspended. – Marko Topolnik Sep 20 '18 at 13:39
  • Previously I thought one must explicitly call something like 'yield' to suspend Kotlin coroutines, mostly because my previous experience with python's similar library. But as it turns out, that is done automatically when calling a function marked as 'suspend'. The only task left is to define where to resume, that is why a callback is needed. Of course, here I'm assuming an asynchronous call is used. That`s the way I see it now. – Rafoul Sep 20 '18 at 13:52
  • No, it's not done automatically. `suspend` just means the function is _allowed_ to suspend itself, but it must explicitly request suspension by calling `suspendCoroutine`. – Marko Topolnik Sep 20 '18 at 14:05
  • @Marko, thank you for pointing this out. I've been crazy busy at work in the past few weeks and I don`t want to reply without further investigation. Now I have time to look into this again, and yes, you are absolutely right about how a coroutine is suspended. This call is underthehood of many library functions. – Rafoul Oct 17 '18 at 09:55