8

I'm having a scenario where I need to periodically call an API to check for a result. I'm using Flowable.interval to create an interval function which calls the API.

However, I'm having trouble with backpressure. In my example below, a new single is created on each tick in the interval. The desired effect is to only call the API if a call is not already in progress

Flowable.interval(1, 1, TimeUnit.SECONDS).flatMap {
        System.out.println("Delay $it")

        //simulates API call
        Single.just(1L).doAfterSuccess {
            System.out.println("NEW SINGLE!!!")
        }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
            System.out.println("SINGLE SUCCESS!!!")
        }.toFlowable()
    }.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

I can solve this using a filter variable like so:

var filter = true

Flowable.interval(1, 1, TimeUnit.SECONDS).filter {
    filter
}.flatMap {

    System.out.println("Delay $it")

    Single.just(1L).doOnSubscribe {
        filter = true
    }.doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE!!!")
        filter = true
    }.toFlowable()
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).blockingFirst()

But it seems like a hacky solution. I've tired applying onBackPressureDrop after the interval function, but it has no effect.

Any suggestions?

Richard
  • 14,427
  • 9
  • 57
  • 85
  • 1
    Hmm, I thought `onBackPressureXXX` should handle this, since it's also described in the docs: `The operator generates values based on time and ignores downstream backpressure which * may lead to {@code MissingBackpressureException} at some point in the chain. * Consumers should consider applying one of the {@code onBackpressureXXX} operators as well.` – Christopher Apr 13 '18 at 06:33

1 Answers1

13

You have to constrain flatMap as well:

Flowable.interval(1, 1, TimeUnit.SECONDS)
.onBackpressureDrop()
.flatMapSingle({
    System.out.println("Delay $it")

    //simulates API call
    Single.just(1L).doAfterSuccess {
        System.out.println("NEW SINGLE!!!")
    }.delay(4, TimeUnit.SECONDS).doAfterSuccess {
        System.out.println("SINGLE SUCCESS!!!")
    }
}, false, 1)  // <----------------------------------------------------------
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe()
Richard
  • 14,427
  • 9
  • 57
  • 85
akarnokd
  • 69,132
  • 14
  • 157
  • 192