2

I have below stream of data coming

1 > 4 > 0 > 3 > 1 > 0 > 0 > 0 > 1 > 2 > 0 > 0 > 0 > 0 > 0 > 0

I want 0 be emitted only if it repeated 3 times sequentially or more, so the expected result will be something like

1 > 4 > 3 > 1 > 0 > 1 > 2 > 0

How can I do that in rxjs?

Reza
  • 18,865
  • 13
  • 88
  • 163
  • I've never used rxjs, but I'd just have a counter for zeroes. Incrementing it whenever the value in a sequence is a zero and set the counter to 0 when it's not. If it's 3, make it 0 again and emit `0`. That is, if you want to emit `0 > 0 > 0 > 0 > 0 > 0` as `0 > 0`, not just `0`. – Robo Robok Oct 05 '20 at 00:39
  • @RoboRobok thanks, yeah that is working in regular programming, I need to find rxjs operators – Reza Oct 05 '20 at 01:14
  • Isn't rxjs "regular programming" though? You subscribe to events and read them as they come, right? – Robo Robok Oct 05 '20 at 01:19
  • I mean using operators like , distinctUntilChanged , ... – Reza Oct 05 '20 at 01:21
  • 1
    Okay, I see there's entire boilerplate of abstraction in this library. I hope someone experienced in rxjs helps then! – Robo Robok Oct 05 '20 at 01:24
  • The question is ambiguous in the timing of more than three subsequent zeros: should the zero be emitted at the third source zero and then ignore further ones or do you first want to wait for a non-zero or completion before emitting the zero on the output? – Ingo Bürk Oct 05 '20 at 06:36
  • 1
    @IngoBürk sorry for confusion, the zero should be emitted at the third source zero and then ignore further ones until a non zero comes – Reza Oct 05 '20 at 14:37

3 Answers3

3

You might want to implement it as in the following example:

const { from } = rxjs;
const { scan, filter, map} = rxjs.operators;


const input = [
    1, 4, 0, 3, 1, 0, 0, 0, 
    1, 2, 0, 0, 0, 0, 0, 0,
    2, 0, 2, 4, 0, 0, 3, 9,
    4, 3, 0, 0, 0, 0, 0, 0,
];

from(input).pipe(
    scan(({ counter }, current) => {

        if (current === 0) return { current, counter: ++counter, emit: counter == 3 }
        else return { current, counter: 0, emit: true };

    }, { emit: false, counter: 0, current: undefined }),

    filter(x => x.emit), 

    map(x => x.current) 
)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>
Rafi Henig
  • 5,950
  • 2
  • 16
  • 36
  • 2
    Side note: this emits a zero as soon as there's three source zeros, then ignoring further zeros until a non-zero comes along. This is totally fine as the question didn't clarify the desired timing, just a nuance to be aware of. – Ingo Bürk Oct 05 '20 at 06:37
  • `@Ingo Bürk` looking at the question you can see that the expected output of `0 > 0 > 0 > 0 > 0 > 0` is a single zero – Rafi Henig Oct 05 '20 at 07:13
  • 1
    A single zero, yes, but it doesn't talk about *when* that zero is to be emitted. As soon as the third consecutive zero arrives (as in your answer) or only once the streak of consecutive zeros ends? Observables have a time dimension. – Ingo Bürk Oct 05 '20 at 10:14
2

Indeed, you could use scan/filter/map. Another option is to use your own custom operator for it.

const {
  Observable,
  from
} = rxjs;

function groupRepeatedNumbers(opts = {
  minLimit: 3,
  desiredNumber: 0
}) {
  return function(source) {
    let numberCounter = 0;
    return new Observable(subscriber => {
      source.subscribe({
        next(value) {
          if (value !== opts.desiredNumber) {
            numberCounter = 0;
            subscriber.next(value);
          } else if (++numberCounter === opts.minLimit) {
            subscriber.next(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      })
    });
  }
}

const input = [
  1, 4, 0, 3, 1, 0, 0, 0,
  1, 2, 0, 0, 0, 0, 0, 0,
  2, 0, 2, 4, 0, 0, 3, 9,
  4, 3, 0, 0, 0, 0, 0, 0,
];

from(input).pipe(
    groupRepeatedNumbers()
  )
  .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.2/rxjs.umd.min.js"></script>

You can see that you can even configure your operator to act as you want, like:

groupRepeatedNumbers({
  minLimit: 5,
  desiredNumber: 3
}) // Will group the number 3, and only emit when it gets 5 or more occasions
Guilhermevrs
  • 2,094
  • 15
  • 15
1

You can use defer to keep track of a variable (like a counter) in a custom operator while providing each subscriber with its own new variable (which is important if you have multiple subscribers).

import { defer, MonoTypeOperatorFunction, Observable } from 'rxjs'; 
import { filter } from 'rxjs/operators';

function sequentialLimit<T>(
  condition: (value: T) => boolean, 
  limit: number
): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) => defer(() => {
    let counter = 0;
    return source.pipe(
      filter(v => {
        condition(v) ? counter++ : counter = 0;
        return counter === 0 || counter === limit;
      })
    );
  });
}

Usage:

const input = [
  1, 4, 0, 3, 1, 0, 0, 0, 1, 2, 0, 0, 0, 0, 0, 0,
  2, 0, 2, 4, 0, 0, 3, 9, 4, 3, 0, 0, 0, 0, 0, 0,
];

from(input).pipe(
  sequentialLimit(v => v === 0, 3)
).subscribe(console.log)

https://stackblitz.com/edit/rxjs-zpwnxq?file=index.ts

frido
  • 13,065
  • 5
  • 42
  • 56
  • I suspect `defer` is unnecessary here as the following would give you the same results: `function sequentialLimit( condition: (value: T) => boolean, limit: number ): MonoTypeOperatorFunction { let counter = 0; return source => source.pipe( filter(v => { condition(v) ? counter++ : counter = 0; return counter === 0 || counter === limit; }) ); }` – Rafi Henig Oct 05 '20 at 17:04
  • 1
    @RafiHenig If you don't use `defer` the `counter` variable will be shared by multiple subscribers. This would lead to incorrect results when you subscribe multiple times to the same observable. – frido Oct 05 '20 at 17:13
  • why? as far as I understand each subscription will have its own counter – Rafi Henig Oct 05 '20 at 17:17
  • 1
    @RafiHenig If you don't use `defer` and call the function `sequentialLimit` once then one `counter` variable will be created. If you then subscribe to the created observable twice those two subscribers will share this same `counter` variable. – frido Oct 05 '20 at 17:29