3

This question is an extension of my previous question that you can find here:

How to use RxJS to display a "user is typing" indicator?

After having successfully been able to track whether or not the user was typing, I needed to be able to use that particular state as a trigger for a clock.

The logic is simple, essentially I want a clock to be run when the user is typing. But when the user stops typing, I need the clock to pause. When the user starts typing again, the clock should continue to accumulate.

I have already been able to get it to work, but it looks like a mess and I need help refactoring it so it isn't a ball of spaghetti. Here is what the code looks like:

/*** Render Functions ***/

const showTyping = () =>
  $('.typing').text('User is typing...');

const showIdle = () =>
  $('.typing').text('');

const updateTimer = (x) =>
  $('.timer').text(x);

/*** Program Logic ***/

const typing$ = Rx.Observable
  .fromEvent($('#input'), 'input')
  .switchMapTo(Rx.Observable
               .never()
               .startWith('TYPING')
               .merge(Rx.Observable.timer(1000).mapTo('IDLE')))
  .do(e => e === 'TYPING' ? showTyping() : showIdle());

const timer$ = Rx.Observable
  .interval(1000)
  .withLatestFrom(typing$)
  .map(x => x[1] === 'TYPING' ? 1 : 0)
  .scan((a, b) => a + b)
  .do(console.log)
  .subscribe(updateTimer)

And here is the link to the live JSBin: http://jsbin.com/lazeyov/edit?js,console,output

Perhaps I will walk you through the logic of the code:

  1. I first build a stream to capture each typing event.
  2. For each of these events, I will use switchMap to: (a) fire off the original "TYPING" event so we don't lose it, and (b) fire off an "IDLE" event, 1 second later. You can see that I create these as separate streams and then merge them together. This way, I get a stream that will indicate the "typing state" of the input box.
  3. I create a second stream that sends an event every second. Using withLatestFrom, I combine this stream with the previous "typing state" stream. Now that they are combined, I can check whether or not the typing state is "IDLE" or "TYPING". If they are typing, I give the event a value of 1, otherwise a 0.
  4. Now I have a stream of 1s and 0s, all I have to do is add them back up with .scan() and render it to the DOM.

What is the RxJS way to write this functionality?

EDIT: Method 1 — Build a stream of change-events

Based on @osln's answer.

/*** Helper Functions ***/

const showTyping = () => $('.typing').text('User is typing...');
const showIdle = () => $('.typing').text('');
const updateTimer = (x) => $('.timer').text(x);
const handleTypingStateChange = state =>
  state === 1 ? showTyping() : showIdle();

/*** Program Logic ***/

const inputEvents$ = Rx.Observable.fromEvent($('#input'), 'input').share();

// streams to indicate when user is typing or has become idle
const typing$ = inputEvents$.mapTo(1);
const isIdle$ = inputEvents$.debounceTime(1000).mapTo(0);

// stream to emit "typing state" change-events
const typingState$ = Rx.Observable.merge(typing$, isIdle$)
  .distinctUntilChanged()
  .share();

// every second, sample from typingState$
// if user is typing, add 1, otherwise 0
const timer$ = Rx.Observable
  .interval(1000)
  .withLatestFrom(typingState$, (tick, typingState) => typingState)
  .scan((a, b) => a + b, 0)

// subscribe to streams
timer$.subscribe(updateTimer);
typingState$.subscribe(handleTypingStateChange);

JSBin Live Demo

EDIT: Method 2 — Using exhaustMap to start counter when user starts typing

Based on Dorus' answer.

/*** Helper Functions ***/

const showTyping = () => $('.typing').text('User is typing...');
const showIdle = () => $('.typing').text('');
const updateTimer = (x) => $('.timer').text(x);

/*** Program Logic ***/

// declare shared streams
const inputEvents$ = Rx.Observable.fromEvent($('#input'), 'input').share();
const idle$ = inputEvents$.debounceTime(1000).share();

// intermediate stream for counting until idle
const countUntilIdle$ = Rx.Observable
  .interval(1000)
  .startWith('start counter') // first tick required so we start watching for idle events right away
  .takeUntil(idle$);

// build clock stream
const clock$ = inputEvents$
  .exhaustMap(() => countUntilIdle$)
  .scan((acc) => acc + 1, 0)

/*** Subscribe to Streams ***/
idle$.subscribe(showIdle);
inputEvents$.subscribe(showTyping);
clock$.subscribe(updateTimer);

JSBin Live Demo

Community
  • 1
  • 1
adrianmcli
  • 1,956
  • 3
  • 21
  • 49

2 Answers2

4

If you want to continuously update the UI, I don't think there's any way around using a timer - I might have written the stream a little differently by initiating the timer by change-events - but your current stream seems also okay as it is already:

const inputEvents$ = Rx.Observable
  .fromEvent($('#input'), 'input');

const typing$ = Rx.Observable.merge(
  inputEvents$.mapTo('TYPING'),
  inputEvents$.debounceTime(1000).mapTo('IDLE')
)
  .distinctUntilChanged()
  .do(e => e === 'TYPING' ? showTyping() : showIdle())
  .publishReplay(1)
  .refCount();

const isTyping$ = typing$
  .map(e => e === "TYPING");

const timer$ = isTyping$
  .switchMap(isTyping => isTyping ? Rx.Observable.interval(100) : Rx.Observable.never())
  .scan(totalMs => (totalMs + 100), 0)
  .subscribe(updateTimer);

Live here.


If you don't need to update the UI and just want to capture the duration of the typing, you could use start- and stop-events and map them to timestamps like this e.g.:

const isTyping$ = typing$
  .map(e => e === "TYPING");

const exactTimer$ = isTyping$
  .map(() => +new Date())
  .bufferCount(2)
  .map((times) => times[1] - times[0])
  .do(updateTimer)
  .do(typedTime => console.log("User typed " + typedTime + "ms"))
  .subscribe();

Live here.

olsn
  • 16,644
  • 6
  • 59
  • 65
  • Thanks for the awesome response. Is there a better way to write: `Rx.Observable .never() .startWith('TYPING') .merge(Rx.Observable.timer(1000).mapTo('IDLE')))` ? Why aren't these chainable? Why do I have to create two streams? Why couldn't I have: `Rx.Observable.startWith('TYPING').timer(1000).mapTo('IDLE')`? – adrianmcli Jan 07 '17 at 18:51
  • 1
    you can sure write that differently, i didn't thought that was part of your question - i have updated the answer how i would write that – olsn Jan 07 '17 at 19:05
  • interesting, why is the `share()` necessary? And also, what exactly does `publishReplay()` and `refCount()` do? – adrianmcli Jan 07 '17 at 20:19
  • 1
    the share is not required, i just didn't update the bins ;) - the `publishReplay(1).refCount()` is also not mandatory, but it helps in case some subscriber subscribes late, then it will still receive current state and `publish` because it is used at more than one place and we don't need a completely separate stream for each subscription – olsn Jan 07 '17 at 20:58
  • Thanks for your help. Sounds like I need to read up more on multicasting! – adrianmcli Jan 07 '17 at 21:02
  • Does it add any value to use `share()` on an already hot stream? – Dorus Jan 07 '17 at 21:27
  • Nope, as I mentioned above it's not required, I removed it - thanks :) – olsn Jan 07 '17 at 21:31
  • Yeah i'm a bit torn up between share or not share myself. It's good practice when you reuse streams, but it looks weird on `.fromEvent` that is inherently hot :) – Dorus Jan 07 '17 at 21:35
  • And on the subject of `share`, the way you use it is not safe at all on cold streams, you subscribe twice but with a small delay between the subscriptions, if the source fires in that time, you miss one event, that might be problematic in some circumstances. That's why i would use `.publish()` with `.connect()` instead of `.refCount()`, or use `.publish(selector)` like i did in my answer. – Dorus Jan 07 '17 at 21:37
  • Maybe i'm missing something, but I don't see where it would be subscribed twice - sorry, it's late here ;) – olsn Jan 07 '17 at 22:21
  • @olsn As `inputEvents$` is used twice, it is subscribed twice. That's why we use `share()`, an operator specialized in multi-casting. However `fromEvent` will also multi-cast so adding `share()` has little value. – Dorus Jan 07 '17 at 23:11
  • Yes, that i know - i was asking about the refcount vs. connect – olsn Jan 07 '17 at 23:34
  • @olsn Oh, misunderstand your question. With `refCount`, you will subscribe as soon as the first subscriber arrives. Perhaps your source used `.startWith(value)`, and this value is emitted right away. The second subscription, arriving tents of a millisecond later, will not receive the startWith value. With `connect()` you would delay the backing subscription the to source and you can safely subscribe twice before you activate the source. – Dorus Jan 08 '17 at 20:09
  • yes...i know that and its correct..., but where in this answer are two subscriptions? – olsn Jan 08 '17 at 20:10
1

I notice a few problems with your code. The gist of it is good, but if you use different operators you can do the same thing even easier.

First you use switchMap, this is a nice operator to start a new stream every time a new input arrives. However, what you really want is to continue the current timer as long as the user is typing. A better operator here would be exhaustMap because exhaustMap will keep the already active timer until it stops. We can then stop the active timer if the user is not typing for 1 second. That is easily done with .takeUntil(input.debounceTime(1000)). That would result in the very short query:

input.exhaustMap(() => Rx.Observable.timer(1000).takeUntil(input.debounceTime(1000)))

To this query, we can hook the display events you want, showTyping, showIdle etc. We also need to fix the timers index, as it will reset every time the user stops typing. This can be done with using the second parameter of project function in map, as this is the index of the value in the current stream.

Rx.Observable.fromEvent($('#input'), 'input')
  .publish(input => input
    .exhaustMap(() => {
        showTyping();
        return Rx.Observable.interval(1000)
          .takeUntil(input.startWith(0).debounceTime(1001))
          .finally(showIdle);
    })
  ).map((_, index) => index + 1) // zero based index, so we add one.
  .subscribe(updateTimer);

Notice i used publish here, but it is not strictly needed as the source is hot. However recommended because we use input twice and now we do not have to think about if it's hot or cold.

Live demo

/*** Helper Functions ***/

const showTyping = () =>
  $('.typing').text('User is typing...');

const showIdle = () =>
  $('.typing').text('');

const updateTimer = (x) =>
  $('.timer').text(x);

/*** Program Logic ***/

Rx.Observable.fromEvent($('#input'), 'input')
  .publish(input => input
    .exhaustMap(() => {
        showTyping();
        return Rx.Observable.interval(1000)
          .takeUntil(input.startWith(0).debounceTime(1001))
          .finally(showIdle);
    })
  ).map((_, index) => index + 1) // zero based index, so we add one.
  .subscribe(updateTimer);
<head>
  <script src="https://code.jquery.com/jquery-3.1.0.js"></script>
  <script src="https://unpkg.com/@reactivex/rxjs@5.0.0-beta.12/dist/global/Rx.js"></script>
</head>
<body>
  <div>
    <div>Seconds spent typing: <span class="timer">0</span></div>
    <input type="text" id="input">
    <div class="typing"></div>
  </div>
</body>
Dorus
  • 7,276
  • 1
  • 30
  • 36
  • Hi, I've made an implementation based off of yours. Can you tell me what is wrong with my implementation since it is missing some of the stuff that you are doing: http://jsbin.com/lazeyov/edit?js,console,output – adrianmcli Jan 08 '17 at 04:26
  • @adrianmc Oops, i just realised a mistake here. I used `timer` instead of `interval`, that means the timer stops every second. In your example, if you type slowly, and measure the time between counted seconds, you'll notice it'll go up a little slower than real seconds. – Dorus Jan 08 '17 at 09:58
  • Do you really need the `.startWith(0)` ? – olsn Jan 09 '17 at 23:19
  • 1
    @olsn yeah, surprisingly you do. Because `input` triggers `exhaustMap` and then `exhaustMap` subscribes to `input` again inside `takeUntil`, the event that triggered `exhaustMap` is now missed by `takeUntil`. I discovered this when i made a code-snipped out of it, if the user presses 1 key only, the counter will keep going without the `.startWith(0)`. – Dorus Jan 11 '17 at 09:45