1

I want to transform my source Flowable in such a way that events only go though if they are the first item within a specified period.

That is, I want the first item to go through and then drop all subsequent items until there was a period of, say, 10 seconds, in which no upstream event arrived.

Note that is this neither

  • debounce: This would emit each item iff it was not followed by another one for 10 seconds - but this will force a 10 second delay on even the first item. I want to emit the first item right away.
  • throttleFirst: This would emit the first item and then drop all subsequent items for 10 seconds after that first one. I want to have the blocking period reset after each upstream item.

I've now solved it like this:

source
  .flatMap { Flowable.just(1).concatWith(Flowable.just(-1).delay(10, TimeUnit.SECONDS)) }
  .scan(0, { x, y -> x + y })
  .map { it > 0 }
  .distinctUntilChanged()
  .filter { it }

NOTE: I don't care about the actual items in source, only that they occur - but, of course, I could just wrap the items in a Pair along with 1 or -1).

Is there a simpler way to use built-in RxJava(2) Operators to achieve the same goal?

david.mihola
  • 12,062
  • 8
  • 49
  • 73
  • Sounds like a previous question: https://stackoverflow.com/questions/41964731/immediate-debounce-in-rx There is an extension operator for that https://github.com/akarnokd/RxJava2Extensions#flowabletransformersdebouncefirst – akarnokd Jul 07 '17 at 09:49

2 Answers2

2

It is possible by using the fact that switchMap only subscribes to one Flowable at once and using a boolean to check if it has to emit:

class ReduceThrottle<T>(val period: Long, val unit: TimeUnit) : FlowableTransformer<T, T> {
    override fun apply(upstream: Flowable<T>): Publisher<T> {
        return Flowable.defer {
            val doEmit = AtomicBoolean(true)

            upstream.switchMap { item ->
                val ret = if (doEmit.compareAndSet(true, false)) {
                    // We haven't emitted in the last 10 seconds, do the emission
                    Flowable.just(item)
                } else {
                    Flowable.empty()
                }

                ret.concatWith(Completable.timer(period, unit).andThen(Completable.fromAction {
                    // Once the timer successfully expires, reset the state
                    doEmit.set(true)
                }).toFlowable())
            }
        }
    }
}

Then it is simply a matter of applying the transformer: source.compose(ReduceThrottle(10, TimeUnit.SECONDS)).

Kiskae
  • 24,655
  • 2
  • 77
  • 74
-1

This might do what you need

source.debounce(item -> Observable.timer(10,TimeUnit.SECONDS))
JohnWowUs
  • 3,053
  • 1
  • 13
  • 20