6

We're refactoring our project from RX to Kotlin Coroutines, but not in one go, so we need our project use both for some time.

Now we have a lot of methods that use RX single as a return type like this, because they are heavy, long running operations, like API calls.

fun foo(): Single<String> // Heavy, long running opertation

We want it to be like this:

suspend fun foo(): String // The same heavy, long running opertation

Where we use this method, we want to still use RX.

We have been doing this:

foo()
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }

Now how should I convert my suspend fun to produce a Single, that I can work with?

Is this a good idea?

Single.fromCallable {
    runBlocking {
        foo() // This is now a suspend fun
    }
}
    .subscribeOn(Schedulers.io())
    .map { ... }
    .subscribe { ... }
Adam Kis
  • 1,404
  • 14
  • 17

1 Answers1

14

I didn’t give it a lot of time to reason it out, but I think your way of creating it might use one additional thread for the duration of the suspend function.

There is an official kotlinx-coroutines-rx3 library that has conversions. The rxSingle function is relevant here. Under the hood, it will call the suspend function in a suspending way without blocking an additional thread.

I haven’t used this myself so you may want to test for yourself to be sure, but I don’t think you need to use subscribeOn immediately following this because the actual work is being done in a coroutine. Your Foo suspend function, since it is a suspend function, should already be internally using Dispatchers.IO if it performs blocking IO.

rxSingle { foo() }
    .map { ... }
    .subscribe { ... }
Tenfour04
  • 83,111
  • 11
  • 94
  • 154
  • Thanks, it works! If I want to make it use Dispatchers.IO, I need to do this (The default is Dispatchers.Default): rxSingle(Dispatchers.IO) { foo() } – Adam Kis Mar 29 '22 at 08:14
  • What I mean is, since `foo()` is a suspend function, you don’t need to specify a dispatcher to call it. It’s an error to write a suspend function that blocks. Therefore you never have to specify a dispatcher for calling a suspend function. – Tenfour04 Mar 29 '22 at 11:33
  • The rxSingle method accepts a CoroutineDispatcher to specify where you want to run it (otherwise the default is Dispatchers.Default). The suspend function won't block anything, if you don't call it from runBlocking {} but it can run for a long time. I think specifying a dispatcher is justified if you don't want the suspend function to run on Dispatchers.Default, and it is often necessary for unit tests. – Adam Kis Mar 30 '22 at 09:46
  • can foo() return a flow? – aguiarroney Feb 13 '23 at 17:55
  • @aguiarroney No reason I see it can't return absolutely anything. – Tenfour04 Feb 13 '23 at 18:04