17

I have two behaviour subject streams what I'm trying to forkJoin with no luck. As I imagined it gives back the two last values of it. Is this possible to implement it somehow?

It is not called after the subject.

let stream1 = new BehaviorSubject(2);
let stream2 = new BehaviorSubject('two');

Observable.forkJoin(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });
martin
  • 93,354
  • 25
  • 191
  • 226
Lajos
  • 2,549
  • 6
  • 31
  • 38

3 Answers3

26

Maybe you want to use combineLatest instead of forkJoin, it's useful if you don't want to wait for a complete() call.

With combineLatest, when any source observable (in your case, your behavior subjects) emits a value, combineLatest will trigger:

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

combineLatest(stream1, stream2)
    .subscribe(r => {
         console.log(r);
    });

stream1.next(3);
stream2.next('three');

Console log:

(2) [2, "two"] // initial state

(2) [3, "two"] // next triggered on stream1

(2) [3, "three"] // next triggered on stream2

Live demo: https://stackblitz.com/edit/rxjs-qzxo3n

Alisson Reinaldo Silva
  • 10,009
  • 5
  • 65
  • 83
24

Note what forkJoin() actually does from its documentation:

Wait for Observables to complete and then combine last values they emitted.

This means that forkJoin() emits a value when all input Observable are complete. When using BehaviorSubject this means explicitly calling complete() on both of them:

import { Observable, BehaviorSubject, forkJoin } from 'rxjs';

const stream1 = new BehaviorSubject(2);
const stream2 = new BehaviorSubject('two');

forkJoin(stream1, stream2)
  .subscribe(r => {
    console.log(r);
  });

stream1.complete();
stream2.complete();

See live demo: https://stackblitz.com/edit/rxjs-9nqtx6

March 2019: Updated for RxJS 6.

martin
  • 93,354
  • 25
  • 191
  • 226
  • Thank you for your answer! Have you tested your code, because the log is not invoked in my computer? – Lajos Sep 27 '16 at 16:48
5

You can use take(1) pipe or complete() method that mentioned above.

private subjectStream1 = new BehaviorSubject(null);
stream1$: Observable = this.subjectStream1.asObservable();

private subjectStream2 = new BehaviorSubject(null);
stream2$: Observable = this.subjectStream2.asObservable();

forkJoin({
  stream1: this.stream1$.pipe(take(1)),
  stream2: this.stream2$.pipe(take(1))
})
.pipe(takeUntil(this._destroyed$))
.subscribe(values) => console.log(values));
sjcoder
  • 161
  • 2
  • 8