6

To learn rxjs im playing around with it. My code:

// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));

My idea was to run it 40 time and than finish the observable (it could be another requirement e.g. see if the value is 10 at 10:00 (main goal is to do an evaluation with value and force a finish)). Im looking for an alternative to the placeholder finish() because finish does not exist. How can I get to the resolve function () => console.log('resolved') of the subscribe method?

I found How can I complete Observable in RxJS but the answer is from 2015 and im assuming by now there is an answer for the current rxjs version.

Sven van den Boogaart
  • 11,833
  • 21
  • 86
  • 169

3 Answers3

8

Acuatally is still the same you only need to use pipe operator. You can view example here

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
const subscribe = example.subscribe(val => console.log(val));
Tony Ngo
  • 19,166
  • 4
  • 38
  • 60
3

both answers mention takeuntil and take are correct but another way is to use the subscription object to unsubscribe it just another option

const subx= example.subscribe(val =>  { 
   console.log(val); 
   if (val == 40) {
    subx.unsubscribe() 
   }
 });

demo

Updated

in case you have a many subscribers and you want to put the condtion that complate a source observable take operator can do the job here

const source = interval(1000).pipe(take(5)); // 

source.pipe(map(res => res * 10)).subscribe(val => {
  console.log("", val);
});

source.subscribe(val => {
  console.log(val);
});

demo

Muhammed Albarmavi
  • 23,240
  • 8
  • 66
  • 91
  • Thanks this solves the problem will accept, it would be great if there was a way to send a complete from the observable. lets imagine i have 1000 subscribers now i have to add a comparision to all the observers instead of 1 time in the observable and than broadcast a finish. – Sven van den Boogaart Dec 02 '19 at 11:04
  • @SvenvandenBoogaart you can in that case use take operator , I will update my answer – Muhammed Albarmavi Dec 02 '19 at 11:27
  • @malbarmawi that doesnt solve the case where I want to do a finish based on an state e,g, if the value is 10 at 10:00. the first solution is the best so far. – Sven van den Boogaart Dec 02 '19 at 11:32
2

My idea was to run it 40 time`

For that you can add take(40). In general there are several operators like take that can complete an observable. Check out https://www.learnrxjs.io/operators/filtering/take.html

// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  take(40),
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));
MoxxiManagarm
  • 8,735
  • 3
  • 14
  • 43