4

Background

I've found some classes/functions on a large app I work on, that have calls that shouldn't be on the UI thread (such as accessing the storage or DB).

Such operations could cause ANRs, and indeed I can see a percentage of ANRs on the Play Console.

I'd like to change this, and hopefully by using Kotlin Coroutines to also have a bit more order in code.

So, currently I work on a class that extends BroadcastReceiver and so it needs the onReceive callbacks to be handled one after another on the UI thread, each one will have to "wait" for the previous ones to finish.

Inside the onReceive callback, there are sadly calls that should be done on the background thread, and some on the UI thread. Sometimes there are conditions that have them both.

Meaning for example :

if( someCheckOnUiThread() && someDbOperation()) {
  ...
}

The problem

I'm quite new to Kotlin Coroutines, and even though I've found how to handle this, I'm pretty sure there is a more official way to do it, as I've read some tips and comments about this from others (here).

What I've tried

What I did actually works, but it looks more like a workaround:

private val mainScope = MainScope()
private val backgroundWorkDispatcher: CoroutineDispatcher =
        java.util.concurrent.Executors.newFixedThreadPool(1).asCoroutineDispatcher()

And then use them right in the onReceive callback:

@UiThread
override fun onReceive(somcContext: Context, intent: Intent) {
    val context = somcContext.applicationContext
    //use goAsync just because I'm in BroadcastReceiver
    val pendingAsyncResult = goAsync() 
    mainScope.launch {
        runInterruptible(backgroundWorkDispatcher) {
           // <-- some code here
        }
    }.invokeOnCompletion { throwable ->
        // last operation after done with everything here: 
        pendingAsyncResult.finish()
    }
    //done right away here, and yet the handling will be done one after another, freely
}

Inside the runInterruptible, I can reach the UI thread by calling runBlocking(mainScope.coroutineContext) {} , and I can also cancel the task using cancel() even within.

Using runBlocking is important because I need to wait there for a result. Of course I could use the alternatives when it makes sense, but then I could also use a simple Handler as I don't wait for a result.

I also use backgroundWorkDispatcher to make sure all background operations will be on a single thread, to wait for next operations, one after another.

The question

What are the alternatives to this solution? Something more elegant and/or shorter? Something more official?

Note that I need to handle the operations that are queued by the UI one-after-another, each waiting for the previous one to finish. The BroadcastReceiver is only an example. I'm sure there are (sadly) much harder places to fix in code, but I want to know how to properly handle this first.

android developer
  • 114,585
  • 152
  • 739
  • 1,270
  • Coroutines are inherently concurrent. If used idiomatically, they will be interlaced on suspension points (even on a single thread) - that's what it means for them to be concurrent. The reason why you might be able to enforce sequential behaviour in your "working" code is likely because you're using blocking code inside instead of suspending code, which makes using coroutines a bit pointless. Have you considered using a `Channel` instead as a queue, and then processing events from the channel from one coroutine? – Joffrey Jul 26 '22 at 08:39
  • @Joffrey Why write both a comment and an answer, of the same solution... – android developer Jul 26 '22 at 10:41

2 Answers2

7

Since you were asking about a thread queue in your comments on the other question, here's how I would do a coroutine job queue. Keep in mind, this is if you need each submitted coroutine to run entirely sequentially (no parallel work at all), which I'm not sure is what you're describing above.

class JobQueue {
    private val scope = MainScope()
    private val queue = Channel<Job>(Channel.UNLIMITED)

    init { 
        scope.launch(Dispatchers.Default) {
            for (job in queue) job.join()
        }
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        synchronized {
            val job = scope.launch(context, CoroutineStart.LAZY, block)
            queue.trySend(job)
        }
    }

    fun cancel() {
        queue.cancel()
        scope.cancel()
    }
}

You can create an instance of this class in an object or at the top level to make it last the lifetime of your app. It depends on how long you need the jobs to run for. I don't have a lot of BroadcastReceiver experience, but I know they are short-lived, so if they receive something while your app is off-screen, and the coroutine takes longer than a few seconds, I'm not sure exactly what happens. For this kind of work, I think you need to quickly pass it off to a WorkManager. But if you are doing stuff while your app is on-screen, you can use coroutines.

The following would prevent any part of the submitted job to run before any previously submitted job to the same JobQueue instance.

val jobQueue = JobQueue() // at top level so shared by all BroadcastReceivers

//...

override fun onReceive(someContext: Context, intent: Intent) {
    jobQueue.submit {
        val x = getSomething(someContext.applicationContext) // on main thread
        val y = withContext(Dispatchers.IO) {
            doSomeBlockingFetch() // not on main thread so safe to call blocking fun
        }
        doSomethingWithResult() // on main thread
    }
    // onReceive returns promptly on the main thread as required, but the JobQueue
    // prevents subsequent queue-submitted jobs from running before this one
    // is *completely* finished, including the final doSomethingWithResult() call
    // on the main thread.
}

Regarding your code in the question:

Creating a single-threaded dispatcher can prevent the code using that dispatcher from running in parallel, which might be all you want. But it doesn't create a queue, and provides no guarantee of execution order. Suppose my above example were done using your solution. Two calls to onReceive are made by the OS in quick succession. The doSomeBlockingFetch() part would not be run in parallel using your single-threaded dispatcher, but there's no guarantee of which order they would be called in, or which order the subsequent doSomethingWithResult() would be called in.

If you want a less hacky way of preventing your blocking code from running in parallel, and if you don't care about the execution order of the post-IO main thread work, I would use a mutex instead of a single-threaded dispatcher:

val receiverIOMutex = Mutex() // at top level so shared by all BroadcastReceivers

//...

override fun onReceive(someContext: Context, intent: Intent) {
    anyCoroutineScope.launch(Dispatchers.Main.immediate) {
        val x = getSomething(someContext.applicationContext) // on main thread
        val y = receiverIOMutex.withLock {
            withContext(Dispatchers.IO) {
                doSomeBlockingFetch() // not on main thread so safe to call blocking fun
            }
        }
        doSomethingWithResult() // on main thread
    }
}

Here's an example of the Job Queue class with a hot SharedFlow since you asked for it, but it would be a weird choice. The whole reason SharedFlow was added to Kotlin when there were already Channels was to provide a way for multiple subscribers to get values without consuming them, instead of each value only being allowed to be consumed once regardless of who was reading it. But for a job queue, we don't want to have multiple subscribers and we only want to consume each job once. So if you do this with a SharedFlow, it's like using a wrench to bang your nail in. It will work, but not as elegantly as with a hammer and with more risk of misuse or an accident.

class JobQueue {
    private val scope = MainScope()
    private val queue = MutableSharedFlow<Job>(extraBufferCapacity = Int.MAX_VALUE)

    init { 
        queue.onEach { it.join() }
            .flowOn(Dispatchers.Default)
            .launchIn(scope)
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        synchronized {
            val job = scope.launch(context, CoroutineStart.LAZY, block)
            queue.tryEmit(job)
        }
    }

    fun cancel() {
        scope.cancel()
    }
}
Tenfour04
  • 83,111
  • 11
  • 94
  • 154
  • 1. Once the queue is empty in your solution and the class isn't used, would it auto-stop and GC-ed ? 2. Isn't there anything like the `JobQueue` class you've presented that's official? I could just create a Job list, which would take less code, no? Someone mentioned Flow of some sort. 3. Why would my solution not be reliable? It shouldn't choose a job at random, no? Shouldn't multiple calls on the UI thread to `mainScope.launch` mean that they would be queued, one after another? 4. I don't see where in the second solution there is a queue. It's a lock that works in the UI thread? Why? – android developer Jul 29 '22 at 11:36
  • 1
    1. No, that's not how Channels work. The iterator of a Channel suspends on `hasNext()` until a new value arrives or the Channel is `close()`d, so whenever the queue is empty, the for loop is suspended, waiting for the next item. A launched job is held in memory until it completes. So if you want to clean this JobQueue object from memory, you would need to call `cancel()` on it before dropping the reference to the class. 2. Not that I know of. You could also create this with SharedFlow/StateFlow, but I think that would be slightly more convoluted than using a Channel. – Tenfour04 Jul 29 '22 at 12:54
  • 1
    3. Kotlin makes no promises of execution order of coroutines based on launch order. What you're doing might work, but if it does, it's relying on a private implementation detail subject to change without warning. 4. Right, the 2nd solution isn't a queue. I put that in case all you really wanted was to prevent parallel execution. – Tenfour04 Jul 29 '22 at 12:54
  • 1. So it doesn't auto-GC ? It's stuck in memory forever, even if all references to the BroadcastReceiver class are gone and there is no pending job? 2. Yes someone mentioned it, but didn't provide more information. Can you please demonstrate it too? 3. Too bad. It is quite long already... 4. But you don't need a lock if you are already on the UI thread and have the locking only on the UI thread. – android developer Jul 29 '22 at 13:20
  • 1
    1. Yes, just like a started Thread, a launched coroutine stays in memory and keeps running until it returns or throws. If you cancel the Job or the CoroutineScope that launched it, that stops it, so it can then be reclaimed by the GC. Since this coroutine above is using a for loop on a Channel, it will not return until the Channel is closed. 2. I added an example above. 4. I was suggesting a way of preventing parallel execution of your non-main-thread code, in case that was all you really needed to achieve (not a queue). Creating a second single-threaded dispatcher for this is kinda hacky. – Tenfour04 Jul 29 '22 at 18:08
  • Is it possible to close the channel each time the last job is finished, and start it again when a new job is supposed to be enqueued? Also, someone mentioned a solution of some kind of Flow. Do you know what it is, how to use it, and why would it help? – android developer Jul 29 '22 at 21:59
  • I suppose maybe you could do that, but it would be convoluted and I can’t think of any possible reason to do it. You’d need to use thread synchronization and have to briefly block functions that submit jobs while reconstructing the channel. One suspended coroutine waiting for a channel doesn’t use any cpu and not more than a few bytes of memory. I added a way to use Flow to my answer, and why I don’t think you should use it instead of Channel. Someone might have just mentioned Flow because they’re more commonly used than Channels so that might have just been the first thing that came to mind. – Tenfour04 Jul 30 '22 at 00:44
  • Why would you need a complex synchronization for the auto-closing/creation? I could just do it on the UI thread, always (which is a simple synchronization), as it's the one that will always add to the queue and I could make it the one that finishes each job there (using invokeOnCompletion, perhaps?). Or, I could auto-close in the class you've created, but just make sure again it's on the UI thread that checks the current status. As for Flow (and your previous solution too), could I just override `finalize` to auto-close them? Or it's something that is stuck on memory? – android developer Jul 30 '22 at 08:07
  • If you enforce that jobs can only be submitted from the main thread, that would work. I was just assuming most job queues should be thread-safe. There are a million articles about why Java's `finalize()` should be avoided, but I suppose it wouldn't hurt as a backup and you can plan to manually call `cancel()` when you're nulling out the JobQueue's reference to do it "properly". I thought from your description that you're sharing a queue among multiple receivers though, so I thought you'd want the queue alive as long as your app is open. – Tenfour04 Jul 31 '22 at 15:45
  • If you're worried about the empty queue sitting there, waiting for its next job, you don't need to. It's not using CPU to sit there passively in a suspended state, and is using about as much memory as a very short String. – Tenfour04 Jul 31 '22 at 15:47
  • I see. What would happen if `cancel()` is called twice? Nothing? Also, it says `scope.cancel()` might throw an exception in case it has no job, so I wrapped it with `runCatching`. – android developer Aug 01 '22 at 09:44
1

I also use backgroundWorkDispatcher to make sure all background operations will be on a single thread, to wait for next operations, one after another.

This is not what a single thread enforces in the coroutines world. A single thread prevents parallelism, but doesn't prevent concurrency. If you launch 2 coroutines in a single-threaded dispatcher, the second may very well start before the end of the first one, assuming the first one has at least one suspension point (call to a suspend function). This interlacing of coroutines execution is what concurrency means. See this other question which stems from the same misconception.

Basically, using the launch or async coroutine builders (or, more generally, starting multiple coroutines) expresses concurrency between those snippets of code. While it could be possible to design a custom dispatcher that enforces one coroutine completes before the next one can start, it's sort of the opposite of what coroutines are expected to do.

Now, that doesn't mean you can't do anything in your case, it just means that launching one coroutine per event is probably not the right thing to do (maybe it is ok, though, see EDIT down below). Instead, I suggest creating a channel to represent the queue of events to process, and launching one single coroutine somewhere to process those events (so it correctly expresses that you don't want concurrency). Instead of launching one coroutine per event in onReceive, you would simply send to the channel (probably using sendBlocking in your case, because onReceive is not suspend).

Now about where to launch that "actor" coroutine, I'd say it depends. You can give the channel and the coroutine the scope that you want (I mean "scope" in terms of variable visibility span, not in coroutine terms here). For instance, if you only want to enforce non-concurrency between events of this specific BroadcastReceiver, I'd say declare the channel as a property of this broadcast receiver, and launch the coroutine upon initialization (e.g. in an init block) in a CoroutineScope that's scoped to the broadcast receiver's lifecycle (if you don't have lifecycleScope there, create the scope yourself and cancel it upon destruction of the BroadcastReceiver).

Inside the onReceive callback, there are sadly calls that should be done on the background thread, and some on the UI thread. Sometimes there are conditions that have them both.

This is not a big problem. Coroutines make it easy to switch between threads by using withContext(someDispatcher). Note that withContext waits for whatever code is inside its lambda before it returns. This means that the code is still sequential even if it switches between threads. Synchronization is handled for you.

Here is some sample code about how it could look:

data class MyEvent(
    ... // add properties containing the data you need for processing
    val pendingResult: PendingResult,
)

// in the broadcast receiver
class MyBroadcastReceiver : BroadcastReceiver() {

    private val eventProcessorScope = MainScope(CoroutineName("my-receiver-event-processor"))

    private val events = Channel<MyEvent>()

    init {
        eventProcessorScope.launch {
            for (e in events) {
                try {
                    process(e)
                } finally {
                    e.pendingResult.finish()
                }
            }
        }
    }

    @UiThread
    override fun onReceive(someContext: Context, intent: Intent) {
        val pendingResult = goAsync()
        // extract the necessary data from the context or intent
        val event = YourEvent(..., pendingResult)
        eventChannel.sendBlocking(event)
    }

    suspend fun process(event: MyEvent) {
        // process the event here

        // do something on UI thread

        withContext(Dispatchers.Default) {
            // do some CPU-bound work
        }

        // do something else on UI thread after CPU work

        withContext(Dispatchers.IO) {
            // do some blocking call
        }
    }

    fun close() {
        eventProcessorScope.cancel()
    }
}

EDIT: From the docs of goAsync, I realized that making the event processing async actually prevents other events from being received, which I'm assuming prevents concurrent calls to onReceive:

Keep in mind that the work you do here will block further broadcasts until it completes

This means you actually can launch as many coroutines as you want to process those events, so long as you complete the goAsync pending result at the end of the processing. But then there is no need for runBlocking/runInterruptible, just using withContext here and there should do fine.

Joffrey
  • 32,348
  • 6
  • 68
  • 100
  • I don't understand how the Channel that you've linked to could help. Why would I want it instead? Please explain how you would use it, in code. Also, as I was told, `withContext` will still go to other jobs that are created, ruining what I want to have (a queue). Just because it waits there, doesn't mean it won't go to other places to work on. So instead of A,B,C, it can be A,C,B. Can you please show code and explain how it's better/shorter? – android developer Jul 26 '22 at 10:58
  • Sorry for not providing snippets of code, I didn't have time to setup a proper solution. The channel helps because it is a queue, so it orders elements by "send time" in a way. Then, the most important is using a *single* coroutine to process elements from the channel, because it prevents any other events from being processed before the previous one was completely processed. This solves the concurrency problem (you can't start with one element, then start another one, then come back to the first one). I'll provide some code if I find some time later – Joffrey Jul 26 '22 at 11:03
  • `withContext` is a suspend function, and as such it allows other coroutines to run on the original thread because it frees that thread, indeed. BUT, if you have only one coroutine anyway to process events from your queue, you know that the other coroutines that it yields to won't be processing your events. So, considering only the coroutines processing your events, it doesn't matter. – Joffrey Jul 26 '22 at 11:06
  • Now if you have more coroutines to process your events, the trick is the `goAsync` pending result. AFAIU, as long as you don't "complete" the pending result, `onReceive` will not be called again, and no other coroutines will be enqueued to process events. So even with your original setup (but without blocking the thread) it should be fine, because only one coroutine will be running (and even created) at a time – Joffrey Jul 26 '22 at 11:08
  • "if you have only one coroutine anyway to process events from your queue, you know that the other coroutines that it yields to won't be processing your events" - Why? If the UI thread calls to create it on each call to onReceive, it should be launched right when the thread is suspended on the previous time. Also, please ignore that it's on BroadcastReciever. According to my tests it's incorrect that it always waits for goAsync to finish (it depends on the action, I think), and I want to know the solution even if it's not as such. – android developer Jul 26 '22 at 13:40
  • *If the UI thread calls to create it on each call to onReceive* - I was considering 2 approaches in my answer, the initial one with the channel and single coroutine, and the other approach in the edit (close to your initial one). If we forget about the subtlety of `goAsync` and the broadcast receiver behaviour, let's focus on the first approach. In the first approach you have one channel and one coroutine only. `onReceive` immediately sends elements to the channel using `sendBlocking`, it doesn't spawn new coroutines on each event. You only have one coroutine, started separately. – Joffrey Jul 26 '22 at 14:06
  • Generally speaking, the answer to your question is: use a single coroutine to do stuff sequentially if you want no concurrency at all. If you are ok with concurrency in most places, and only some pieces of code need to be done atomically, you can keep launching multiple coroutines, and add a `Mutex` to protect said pieces of code. – Joffrey Jul 26 '22 at 14:14
  • So you want to create something similar to a Singleton of a coroutine , that has a `while(true)` loop, which goes over a queue of operations to handle, forever ? – android developer Jul 27 '22 at 06:57
  • I added a piece of code yesterday to clarify what I meant. It wouldn't necessarily be running forever, just as long as you need events to be processed. You can then cancel the scope in which the coroutine was started, or close the channel. – Joffrey Jul 27 '22 at 11:14
  • I see. But there is a problem about deciding when to close the channel as it requires to close it only when both UI and the background work is done with this (no pending work and no currently-working work). Someone mentioned using Flow instead. Do you know about this ? – android developer Jul 28 '22 at 12:17