8

I'm using a stream which is throttled when I scroll the window.
While throttling (as long as scrolling), it emits values to the console.

However , when stream is idle (user is not scrolling the window) - I want a timer to kick in. However - if the user starts scrolling again - I don't want that timer to emit values.

Currently I'm doing this :

  const observable = Rx.Observable.fromEvent(window, 'scroll');

  const subscriber = observable
      .throttleTime(300 )
      .map(() => 'throttle')
      .merge(Rx.Observable.interval(1000).map(() => 'tick') )
      .subscribe(
          (x) => {
            console.log('Next: event!', x);
          },
          (err) => {
            console.log('Error: %s', err);
          },
          () => {
            console.log('Completed');
          });

The problem is that , while scrolling - I see both "throttle" AND "tick" ( I should only see "throttle")

Think of this from another POV. A job always has to run. If I scroll - that throttled scroll - should invoke the job. If I don't scroll - a timer should kick in and start doing the job . (and stops if user start scrolling again).

Question:
How can I start a timer after an idle time of not scrolling ?

PLNKR

Royi Namir
  • 144,742
  • 138
  • 468
  • 792
  • 2
    This is how to make a good question on SO. +1 – kind user Apr 20 '17 at 17:38
  • Re [your other comment](http://stackoverflow.com/questions/41522222/making-a-typing-timer-in-rxjs-tracking-time-spent-typing/41526650?noredirect=1#comment74109988_41526650): You could solve this using the trick there using `exhaustMap` + `debounceTime` to detect no scroll. What i did here with `switchMap` works because we do nothing during the burst of events and the work happens when the stream is idle. The other way around, `switchMap` would restart the job(timer) each time a new scroll event is emitted. – Dorus Apr 20 '17 at 20:13

2 Answers2

3

You can use debounceTime to detect periods without scrolling.

const scroll = Rx.Observable.fromEvent(window, 'scroll')
  .throttleTime(300)
  .mapTo(false);
const noscroll = Rx.Observable.fromEvent(window, 'scroll')
  .startWith(0) // init with no scroll.
  .debounceTime(300) // detect no scroll after 300 ms.
  .mapTo(true);
scroll.merge(noscroll)
  .switchMap(e => e ? Rx.Observable.interval(1000).mapTo("Tick!") : Rx.Observable.of("Scroll!"))  
  // start the interval if there was no scroll. Stop the interval if there was a scroll.
  .subscribe(updateTimer)

Another problem with your code is using merge that will keep both sources subscribed, instead i use switchMap (a sibling of mergeMap) that will subscribe to the inner observable each time a new event is emitted, but also unsubscribe the previous inner source if another event is emitted from the source.

Re: "another POV" part of the question: You can replace Rx.Observable.interval(1000) in switchMap with the job. Scrolling will cancel/unsubscribe the job (as empty is emitted), if there is no more scrolling, the job will start again.

Live demo

Dorus
  • 7,276
  • 1
  • 30
  • 36
2

I'd do it like this:

const scroll$ = Rx.Observable.fromEvent(window, 'scroll')
    .throttleTime(300 /* ms */)
    .publish();

scroll$.connect();

const subscriber = scroll$
    .map(() => 'throttle')
    .race(Rx.Observable.interval(1000).map(() => 'tick'))
    .take(1)
    .repeat()
    .subscribe(
        (x) => {
          console.log('Next: event!', x);
        },
        (err) => {
          console.log('Error: %s', err);
        },
        () => {
          console.log('Completed');
        });

This uses the race() operator to subscribe only to the Observable that emits first which is the 1s interval or the scroll event. Right after that I want to start this again with another interval so I use take(1).repeat().

I also had to turn the scroll$ Observable into a hot Observable to keep the throttleTime() running among the repeated subscriptions.

Your updated demo: https://plnkr.co/edit/sWzSm32uoOQ1hOKigo4s?p=preview

martin
  • 93,354
  • 25
  • 191
  • 226
  • May I ask what is the difference between `publish().Connect()` - like you did here VS `publish().refcount()` {which is a `share()`} – Royi Namir Apr 20 '17 at 20:40
  • @Royi `connect()` is not an operator per say (it doesn't return an Observable). It's just a method on `ConnectableObservable` class that returns a `Subscription` that you can use to "disconnect". It's similar to `subscribe()` method that you can chain with an operator but it returns `Subscription` as well. This means it can be used only at the end of the operator chain. `refCount()` on the other hand returns another Observable so you can chain it with other operators. – martin Apr 20 '17 at 20:47
  • Martin , You've made this observable a hot one via connect. but If I change this to `.share()` ( which is also another way of making it hot ) - Then - while scroll - it seems that it doesn't respect the `throttle` delay. why is that ? https://plnkr.co/edit/jjWUxbk9fSItLT1U7uiq?p=preview – Royi Namir Apr 21 '17 at 07:52
  • @Royi `share()` doesn't turn it into a *hot* Observable. It's just a shortcut for `publish().refCount()` where the `refCount()` subscribes/unsubscribes to its source Observable based on the number of observers. Since we're making use of repeatedly subscribing to use the `race()` operator we need to make sure we're always subscribed. That's why we called `connect()` manually. – martin Apr 21 '17 at 09:00