22

I'm trying to test the following RxKotlin/RxJava 2 code:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

I'm attempting to override the schedulers as follows:

// Runs before each test suite
RxJavaPlugins.setInitIoSchedulerHandler { Schedulers.trampoline() }
RxAndroidPlugins.setInitMainThreadSchedulerHandler { Schedulers.trampoline() }

However, I get the following error when running the test:

java.lang.ExceptionInInitializerError
...
Caused by: java.lang.NullPointerException: Scheduler Callable result can't be null
    at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
    at io.reactivex.plugins.RxJavaPlugins.applyRequireNonNull(RxJavaPlugins.java:1317)
    at io.reactivex.plugins.RxJavaPlugins.initIoScheduler(RxJavaPlugins.java:306)
    at io.reactivex.schedulers.Schedulers.<clinit>(Schedulers.java:84)

Has anyone experienced this problem?


The test worked fine when using RxKotlin/RxJava 1 and the following scheduler overrides:

RxAndroidPlugins.getInstance().registerSchedulersHook(object : RxAndroidSchedulersHook() {
    override fun getMainThreadScheduler() = Schedulers.immediate()
})

RxJavaPlugins.getInstance().registerSchedulersHook(object : RxJavaSchedulersHook() {
    override fun getIOScheduler() = Schedulers.immediate()
})
Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
Alex
  • 739
  • 1
  • 6
  • 18
  • 1
    See the updated Javadoc for 2.0.8: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#io() – akarnokd Apr 07 '17 at 18:58
  • 1
    Specifically "Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in a NullPointerException." – Kiskae Apr 07 '17 at 19:01
  • 2
    "Once the Schedulers class has been initialized, you can override the returned Scheduler instance via the RxJavaPlugins.setIoSchedulerHandler(io.reactivex.functions.Function) method." – akarnokd Apr 07 '17 at 19:04
  • Thanks! I tried previously using `setIoSchedulerHandler`, but `flatMap` was not getting called. Finally figured out why: The `validate` method was returning an observable that did `emitter.onNext(null)` :/ Since nulls are no longer accepted in RxJava 2, I changed that to a `Completable` and tests now pass! – Alex Apr 07 '17 at 19:14

4 Answers4

24

I suggest you take a different approach and add a layer of abstraction to your schedulers. This guy has a nice article about it.

It would look something like this in Kotlin

interface SchedulerProvider {
    fun ui(): Scheduler
    fun computation(): Scheduler
    fun trampoline(): Scheduler
    fun newThread(): Scheduler
    fun io(): Scheduler 
}

And then you override that with your own implementation of SchedulerProvider:

class AppSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return AndroidSchedulers.mainThread()
    }

    override fun computation(): Scheduler {
        return Schedulers.computation()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.newThread()
    }

    override fun io(): Scheduler {
        return Schedulers.io()
    }
}

And one for testing classes:

class TestSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun computation(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun io(): Scheduler {
        return Schedulers.trampoline()
    }
}

Your code would look like this where you call RxJava:

mCompositeDisposable.add(mDataManager.getQuote()
        .subscribeOn(mSchedulerProvider.io())
        .observeOn(mSchedulerProvider.ui())
        .subscribe(Consumer<Quote> {
...

And you'll just override your implementation of SchedulerProvider based on where you test it. Here's a sample project for reference, I am linking the test file that would use the testable-version of SchedulerProvider: https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31

Natig Babayev
  • 3,128
  • 15
  • 23
solidak
  • 5,033
  • 3
  • 31
  • 33
  • Thanks for sharing "trampoline". I used "immediate", but it's not available in Rx2 anymore – Leo DroidCoder Sep 19 '17 at 10:19
  • 2
    I don't this we need this layer of abstraction. It feels like we are modifying production code significantly just for the sake of testing, when we already have the means to test it without such an abstraction. The answer provided by @Alex works perfectly. – Henry Apr 01 '18 at 04:49
  • 1
    I'd have to agree with @Henry here. It seems much more practical to just modify the schedulers during testing with `RxJavaPlugin...` than have to modify production code just for the sake of tests – William Reed Jan 07 '19 at 18:57
14

Figured it out! It had to do with the fact that in this code:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

validate(data) was returning an Observable, which was emitting the following: emitter.onNext(null). Since RxJava 2 no longer accepts null values, flatMap was not getting called. I changed validate to return a Completable and updated the scheduler override to the following:

RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }

Now the tests pass!

Alex
  • 739
  • 1
  • 6
  • 18
6

As an alternative to proposed solutions, this has been working fine for a while in my projects. You can use it in your test classes like this:

@get:Rule
val immediateSchedulersRule = ImmediateSchedulersRule()

And the class looks like this:

class ImmediateSchedulersRule : ExternalResource() {

    val immediateScheduler: Scheduler = object : Scheduler() {

        override fun createWorker() = ExecutorScheduler.ExecutorWorker(Executor { it.run() })

        // This prevents errors when scheduling a delay
        override fun scheduleDirect(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
            return super.scheduleDirect(run, 0, unit)
        }

    }

    override fun before() {
        RxJavaPlugins.setIoSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setComputationSchedulerHandler { immediateScheduler }
        RxJavaPlugins.setNewThreadSchedulerHandler { immediateScheduler }

        RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediateScheduler }
        RxAndroidPlugins.setMainThreadSchedulerHandler { immediateScheduler }
    }

    override fun after() {
        RxJavaPlugins.reset()
    }

}

You can find a way to migrate from TestRule to ExternalResource here and get more info on testing RxJava 2 here.

Sebastian
  • 2,896
  • 23
  • 36
5

This is the exact syntax that worked for me:

RxJavaPlugins.setIoSchedulerHandler(scheduler -> Schedulers.trampoline())
Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
Efi G
  • 937
  • 8
  • 15