2

Executors.newSingleThreadExecutor queues the tasks registered to it and then executes them sequentially. The following code:

val singleThreadedExecutor = Executors.newSingleThreadExecutor()

(0..10).forEach { i ->

    singleThreadedExecutor.execute {

        if (i % 2 == 0) {
            Thread.sleep(2000)
        } else {
            Thread.sleep(1000)
        }

        println(i)
    }
}

outputs this:

I/System.out: 0
I/System.out: 1
I/System.out: 2
I/System.out: 3
I/System.out: 4
I/System.out: 5
I/System.out: 6
I/System.out: 7
I/System.out: 8
I/System.out: 9
I/System.out: 10

I want to achieve this behaviour using Kotlin's Coroutines. I have tried using limitedParallelism but it didn't work as I was expecting. See the code below:

val singleThreadedCoroutine = Dispatchers.Default.limitedParallelism(1)

(0..10).forEach { i ->

    lifecycleScope.launch(singleThreadedCoroutine) {

        if (i % 2 == 0) {
            delay(2000)
        } else {
            delay(1000)
        }

        println(i)
    }
}

But its output was:

I/System.out: 1
I/System.out: 3
I/System.out: 5
I/System.out: 7
I/System.out: 9
I/System.out: 0
I/System.out: 2
I/System.out: 4
I/System.out: 6
I/System.out: 8
I/System.out: 10

Am I missing something? How can I queue tasks in a coroutine, so that it executes them sequentially?

Thanks.

Bugs Happen
  • 2,169
  • 4
  • 33
  • 59

1 Answers1

1

The difference here is not really about the dispatcher, it's about the fact that suspending functions don't block threads, and coroutines are concurrent.

The reason why delay() is called "non-blocking" is because it doesn't block the thread it's called from. This means that the thread is free to go execute other coroutines during that time. On the other hand, using Thread.sleep() really blocks the thread, so it will prevent it from doing anything else during that time and other coroutines (or tasks in your case) will have to wait. If you used Thread.sleep() in your coroutines approach, you should see the same result, but that kinda defeats the purpose of coroutines.

Dispatching coroutines is usually done in order, but I don't believe this is guaranteed documented behaviour, and it probably depends on the dispatcher. However, in any case, when they suspend, they are allowed to interleave (in general) - this is almost the definition of concurrency.

If you don't want concurrency, you have several options:

  1. do all the work on-the-spot in one coroutine: run your loop inside a single launch:
lifecycleScope.launch {
    repeat(11) { i ->

        if (i % 2 == 0) {
            delay(2000)
        } else {
            delay(1000)
        }

        println(i)
    }
}
  1. do all the work in one coroutine, but elsewhere: use a Channel as a queue to send the events to, and instead of using launch on each item, just send the items through the channel. Then, spawn a single coroutine that polls elements from the channel for processing.

  2. if what you really want is to protect only certain parts from running in parallel, but you're ok with concurrency otherwise, you can also use a Mutex. You don't even have to use a single-threaded dispatcher in that case

val mutex = Mutex()

repeat(11) { i ->

    lifecycleScope.launch {

        mutex.withLock {

            if (i % 2 == 0) {
                delay(2000)
            } else {
                delay(1000)
            }

            println(i)
        }
    }
}
Bugs Happen
  • 2,169
  • 4
  • 33
  • 59
Joffrey
  • 32,348
  • 6
  • 68
  • 100
  • So I have to do it myself, there is no default way of achieving that? – Bugs Happen Jul 19 '22 at 09:52
  • I added option 1, which is IMO the simplest way of doing what you want. It would help if you gave information about your real-life use case, because here it seems obvious but the solution might not be appropriate in your case. It's unclear why you did this with the executor and the sleep in the first place – Joffrey Jul 19 '22 at 09:53
  • Thanks but that's not really my point. What if I don't have a loop? What if I have a function that is being called from multiple places and but I don't want it to run in parallel? – Bugs Happen Jul 19 '22 at 09:56
  • Parallelism and concurrency are 2 different things. Single-threaded dispatchers, or `limitedParallelism(1)` only prevent parallelism, not concurrency. Why are you trying to do by preventing concurrency? – Joffrey Jul 19 '22 at 09:57
  • Okay, let me give you the use case example. I have a function that logs whatever string is passed to it in a file. I'd like the logs to be in order in which the log function was called. And because writing is an IO operation, it is in a coroutine, where I have to make sure that it doesn't run in parallel. – Bugs Happen Jul 19 '22 at 09:59
  • But where is the concurrency problem here? Is your function doing several things intertwined with suspending calls? If not, there will be no problem, you would just need `limitedParallelism(1)` if writing to the file is not thread-safe. If it is in fact doing several operations (like computing a timestamp), and using suspending calls between those operations, then you might have a concurrency problem, and in this case you just need a mutex around your function to make it "atomic" in a way – Joffrey Jul 19 '22 at 10:20
  • Alright. So the bottom line is, I have to manually control it. Thanks. – Bugs Happen Jul 19 '22 at 10:22