47

Consider the following use case:

  • need to deliver first item as soon as possible
  • need to debounce following events with 1 second timeout

I ended up implementing custom operator based on OperatorDebounceWithTime then using it like this

.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))

CustomOperatorDebounceWithTime delivers the first item immediately then uses OperatorDebounceWithTime operator's logic to debounce later items.

Is there an easier way to achieve described behavior? Let's skip the compose operator, it doesn't solve the problem. I'm looking for a way to achieve this without implementing custom operators.

tomrozb
  • 25,773
  • 31
  • 101
  • 122

15 Answers15

43

Update:
From @lopar's comments a better way would be:

Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))

Would something like this work:

String[] items = {"one", "two", "three", "four", "five"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(
  myObservable.first(), 
  myObservable.skip(1).debounce(1, TimeUnit.SECONDS)
).subscribe(s -> 
  System.out.println(s));
Peter Nixey
  • 16,187
  • 14
  • 79
  • 133
LordRaydenMK
  • 13,074
  • 5
  • 50
  • 56
  • 6
    You could also prevent making dual subscriptions (which in the cold observable case is doing the work twice, and in the hot case could potentially be out of sync) with `publish`: `Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))` Also note that `first` will blow up if the `Observable` is empty, so unless that is desired behavior, you'll probably want `limit(1)`. – lopar May 10 '15 at 17:55
  • 1
    @lopar thanks for the tip. I'm a beginner with RxJava and I'm trying to learn by helping others :). – LordRaydenMK May 10 '15 at 18:25
  • 2
    I was waiting for the solution proposed by @lopar. I think the second one may not work in some situations. `myObservable.skip(1)` may subscribe after the first item has been published so it will skip the second element. – tomrozb May 10 '15 at 20:02
  • 2
    @LordRaydenMK, your original answer was good, no worries about being a beginner! :) Glad I could help tweak it a bit. – lopar May 10 '15 at 21:44
  • 3
    When using the publish form, you should not .skip(1). Because the observable is using the same subscription it will continue from the same spot. It should just be ```Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.debounce(1, TimeUnit.SECONDS)))``` – TrevorSStone Feb 22 '17 at 02:57
  • 5
    use .take(1) instead of .limit(1) in version 2 – Ajay George Jul 05 '17 at 13:08
  • 2
    I used this answer time ago. And I just realesed is wrong. The problem is that the debounce hides the problem. Don't write the skip(1) or you will lost the second item every time. [Full answer here](https://stackoverflow.com/a/49497411/842697) – Brais Gabin Mar 26 '18 at 17:39
18

The answers by @LortRaydenMK and @lopar are best, but I wanted to suggest something else in case it happened to work better for you or for someone in a similar situation.

There's a variant of debounce() that takes a function that decides how long to debounce this particular item for. It specifies this by returning an observable that completes after some amount of time. Your function could return empty() for the first item and timer() for the rest. Something like (untested):

String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
    .debounce(item -> item.equals("one")
            ? Observable.empty()
            : Observable.timer(1, TimeUnit.SECONDS));

The trick is that this function would have to know which item is the first. Your sequence might know that. If it doesn't, you might have to zip() with range() or something. Better in that case to use the solution in the other answer.

Lawrence Kesteloot
  • 4,149
  • 2
  • 31
  • 28
  • There sholud be TimeUnit.SECONDS (you miss the "S"). However it's the only example of usage of this function which I found. It really helped me. Thank you very much! – Warcello May 04 '16 at 13:14
11

A simple solution using RxJava 2.0, translated from the answer for the same question for RxJS, which combines throttleFirst and debounce, then removes duplicates.

private <T> ObservableTransformer<T, T> debounceImmediate() {
    return observable  -> observable.publish(p -> 
        Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS), 
            p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
} 

@Test
public void testDebounceImmediate() {
    Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
        .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
        .doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
            .compose(debounceImmediate())
            .blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}

The approach of using limit() or take() doesn't seem to handle long lived data flows, where I might want to continually observe, but still act immediately for the first event seen for a time.

Adrian Baker
  • 9,297
  • 1
  • 26
  • 22
8

The LordRaydenMK and lopar's answer has a problem: you always lose the second item. I supose that no one realeased this before because if you have a debounce you normally has a lot of events and the second is discarted with the debounce anyways. The correct way to never lose an event is:

observable
    .publish(published ->
        published
            .limit(1)
            .concatWith(published.debounce(1, TimeUnit.SECONDS)));

And don't worry, you are not going to get any duplicated event. If you aren't sure about it you can run this code and check it yourself:

Observable.just(1, 2, 3, 4)
    .publish(published ->
        published
            .limit(1)
            .concatWith(published))
    .subscribe(System.out::println);
Brais Gabin
  • 5,827
  • 6
  • 57
  • 92
7

Use the version of debounce that takes a function and implement the function in this way:

    .debounce(new Func1<String, Observable<String>>() {
        private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
        @Override
        public Observable<String> call(String s) {
             // note: standard debounce causes the first item to be
             // delayed by 1 second unnecessarily, this is a workaround
             if (isFirstEmission.getAndSet(false)) {
                 return Observable.just(s);
             } else {
                 return Observable.just(s).delay(1, TimeUnit.SECONDS);
             }
        }
    })

The first item emits immediately. Subsequent items are delayed by a second. If a delayed observable doesn't terminate before the following item arrives, it's cancelled, so the expected debounce behavior is fulfilled.

Rich Ehmer
  • 2,764
  • 23
  • 18
7

Kotlin extension functions based on @lopar's comment:

fun <T> Flowable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Flowable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}

fun <T> Observable<T>.debounceImmediate(timeout: Long, unit: TimeUnit): Observable<T> {
    return publish {
        it.take(1).concatWith(it.debounce(timeout, unit))
    }
}
gswierczynski
  • 3,813
  • 2
  • 21
  • 22
2

Ngrx - rxjs solution, split the pipe to two

onMyAction$ = this.actions$
    .pipe(ofType<any>(ActionTypes.MY_ACTION);

lastTime = new Date();

@Effect()
onMyActionWithAbort$ = this.onMyAction$
    .pipe(
        filter((data) => { 
          const result = new Date() - this.lastTime > 200; 
          this.lastTime = new Date(); 
          return result; 
        }),
        switchMap(this.DoTheJob.bind(this))
    );

@Effect()
onMyActionWithDebounce$ = this.onMyAction$
    .pipe(
        debounceTime(200),
        filter(this.preventDuplicateFilter.bind(this)),
        switchMap(this.DoTheJob.bind(this))
    );
Asaf
  • 21
  • 2
2

My solution for Dart:

extension StreamExt<T> on Stream<T> {
  Stream<T> immediateDebounce(Duration duration) {
    var lastEmit = 0;
    return debounce((event) {
      if (_now - lastEmit < duration.inMilliseconds) {
        lastEmit = _now;
        return Stream.value(event).delay(duration);
      } else {
        lastEmit = _now;
        return Stream.value(event);
      }
    });
  }
}

int get _now =>  DateTime.now().millisecondsSinceEpoch;
Nickolay Savchenko
  • 1,474
  • 16
  • 28
1

To prevent making dual subscriptions using this:

    const debouncedSkipFirstStream$ = stream$.pipe(
        map((it, index) => ({ it, index })),
        debounce(({ index }) => (
            index ? new Promise(res => setTimeout(res, TimeUnit.SECONDS))
                : Rx.of(true))),
        map(({ it }) => it),
    );

if using split solution you will see 'run' print twice

x = rxjs.Observable.create(o=>{
    console.info('run');
    o.next(1);
    o.next(2);
});
a = x.pipe(rxjs.operators.take(1));
b = x.pipe(rxjs.operators.skip(1), rxjs.operators.debounceTime(60));
rxjs.concat(a, b).subscribe(console.log);
JerryRo
  • 11
  • 2
1

I went with

Flowable.concat(

    flowable // emits immediately
        .take(1)
        .skipWhile { it.isEmpty() },

    flowable // same flowable, but emits with delay and debounce
        .debounce(2, TimeUnit.SECONDS)
)
    .distinctUntilChanged()
ivan8m8
  • 388
  • 4
  • 17
1

If someone looking for this in 2021:

@OptIn(FlowPreview::class)
fun <T> Flow<T>.debounceImmediate(timeMillis: Long): Flow<T> =
    withIndex()
        .onEach { if (it.index != 0) delay(timeMillis) }
        .map { it.value }

Usage:

authRepository.login(loginDto)
                    .debounceImmediate(10000)
Nestor Perez
  • 827
  • 11
  • 17
1

After reading this article I ended up using the throttleLatest operator to get very similar behaviour to the immediate debounce I was looking for.

throttleLatest marble diagram

The following code will instantly emit the first item and then check for new items every 500ms. Only the latest event received within that 500ms window will be sent out.

observable.throttleLatest(500, TimeUnit.MILLISECONDS)
Simon Raes
  • 442
  • 4
  • 15
0

   view.clicks()
            .throttleFirst(2, TimeUnit.SECONDS)
            .subscribe {
                println("Clicked button")
            }

I found this the simplest way to do that. clicks() come from rx view binding. Add this dependency to get view observable

 implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0
  • While this code may answer the question, providing additional context regarding how and/or why it solves the problem would improve the answer's long-term value. – Abhishek Dutt Nov 12 '21 at 04:07
0

For those who are trying to solve the same problem using Kotlin Flow:

fun <T> Flow<T>.throttleFirst(timeout: Duration): Flow<T> {
    var job = Job().apply { complete() }
    return onCompletion { job.cancel() }.run {
        flow {
            coroutineScope {
                collect { value ->
                    if (!job.isActive) {
                        emit(value)
                        job = launch { delay(timeout.inWholeMilliseconds) }
                    }
                }
            }
        }
    }
}

Example:

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.throttleFirst(1.seconds).collect { ... }
// 1, 4, 5
David Miguel
  • 12,154
  • 3
  • 66
  • 68
0
const changes = this.form.valueChanges; // For example

const changesDebounced = changes.pipe(
    exhaustMap(change => merge(
        of(change),
        changes.pipe(startWith(change), debounceTime(500), take(1)),
    )),
    distinctUntilChanged(),
);

changesDebounced will emit immediately on changes. If subsequent changes occur, they are debounced (by 500ms). After no further values have been emitted for 500ms, changesDebounced will again emit immediately on the next change.

Using exhaustMap on the changes observable will ignore incoming changes while we execute the debounce logic.

Then we merge two observables: of(change) which will emit the change immediately, and also the debounced changes. Use take(1) so that we return control to the outer observable after the first debounced value is emitted. This lets us listen for changes again without debouncing.

Also notice that the debounced observable uses startsWith(change). This is in case only one change is emitted within the debounce interval, meaning there are no other changes to debounce.

Lastly, we need distinctUntilChanged(). This is again in case only one change is emitted during the debounce interval. In that scenario, the debounced observable will emit the same value as the of(change) observable, so we can ignore the redundant emitted value.

Aaron Eads
  • 63
  • 4