22

I have a few observables. And I need to know which one triggered the subscribe.

Observable.combineLatest(
      this.tournamentsService.getUpcoming(),
      this.favoriteService.getFavoriteTournaments(),
      this.teamsService.getTeamRanking(),
(tournament, favorite, team) => {
//what triggered combinelatest to run?
}).subscribe()
AlexZvl
  • 2,092
  • 4
  • 18
  • 32

6 Answers6

18

A quite clean and "rx"-way of achieving this is by using the timestamp operator http://reactivex.io/documentation/operators/timestamp.html

Example code

sourceObservable
  .pipe(
    timestamp(),  // wraps the source items in object with timestamp of emit
    combineLatest( otherObservable.pipe( timestamp() ), function( source, other ) {

      if( source.timestamp > other.timestamp ) {

        // source emitted and triggered combineLatest
        return source.value;
      }
      else {

        // other emitted and triggered combineLatest
        return other.value;
      }

    } ),
  )

If more than two observables are involved in the combineLatest() sorting them by timestamp would enable to detect which one triggered the combineLatest().

steinharts
  • 324
  • 2
  • 8
13

Short answer is: You don't know. You could implement some workaround, however this is really ugly and I would recommend rethinking the usecase why you need this and maybe if you can change the architecture. Also keep in mind, that the first execution of your function will be after all three observables have emitted at least 1 value.

Anyway - a possible workaround could be:

let trigger = "";
Observable.combineLatest(
      this.tournamentsService.getUpcoming().do(() => trigger = "tournament"),
      this.favoriteService.getFavoriteTournaments().do(() => trigger = "favTournament"),
      this.teamsService.getTeamRanking().do(() => trigger = "teamRanking"),
(tournament, favorite, team) => {
   console.log(`triggered by ${trigger}`);
}).subscribe();

If you want to execute a specific operation based on which observable triggered, you should make use of each observable and utilize them as individual triggers, that switch to a combined trigger, it might be slightly more code but it is much cleaner and you will not end up in an ugly if/else, switch/case-mess with some hacky workarounds - plus you will have even have the oportunity to use the async-pipe instead of manually subscribing to everything and updating local variables (which is a bad practice anyways):

Here is some example code of how this could look like:

let upcoming$ = this.tournamentsService.getUpcoming();
let favorite$ = this.favoriteService.getFavoriteTournaments();
let rankings$ = this.teamsService.getTeamRanking();

let allData$ = Observable.combineLatest(
    upcoming$, favorite$, rankings$,
    (tournament, favorite, team) => {
        return {tournament, favorite, team};
    }
);

// initial call -> this SHOULD be redundant,
// but since I don't know your code in detail
// i've put this in - if you can remove it or not
// depends on the order your data coming in
allData$
    .take(1)
    .do(({tournament, favorite, team}) => {
        this.displayMatches(...);
        this.sortByFavorites(...);
        this.fillWithRanking(...);
    })
    .subscribe();

// individual update triggers
upcoming$
    .switchMapTo(allData$.take(1))
    .do(({tournament, favorite, team}) => this.displayMatches(...))
    .subscribe();

favorite$
    .switchMapTo(allData$.take(1))
    .do(({tournament, favorite, team}) => this.sortByFavorites(...))
    .subscribe();

rankings$
    .switchMapTo(allData$.take(1))
    .do(({tournament, favorite, team}) => this.fillWithRanking(...))
    .subscribe();
TmTron
  • 17,012
  • 10
  • 94
  • 142
olsn
  • 16,644
  • 6
  • 59
  • 65
  • Hey, thanks for response. Thats actually how my workaround looks like right now :) I haven't find any better solution for my case. And yes, I need the result of all three of them at the beginning. – AlexZvl Nov 26 '16 at 09:23
  • I'm curious - why do you need this? – olsn Nov 26 '16 at 09:24
  • I have 3 firebase observables: -get upcoming matches -get favorite -get rankings The first time it loads I should display matches, sorted by favorites, and filled with rankings. --If matches is updated I want to resort them only --If favorite is updated I will resort them as according to favorites. ---If standings is changing I will not resort it , but I will just update rankings for this matches. – AlexZvl Nov 26 '16 at 20:58
  • based on your comment, i've extended my answer on how you should approach this issue in a cleaner, rxjs-like way – olsn Nov 26 '16 at 21:35
  • This looks much cleaner! you are right! However doesnt this mean that first time I will run all 4 observables (allData$, upcoming$, favorite$, ranking$). ? Is there a way to not take first time for (upcoming$, favorite$, ranking$) ? since I have already have data from (allData$)? – AlexZvl Nov 27 '16 at 12:15
  • You could either use `ranking$.skip(1).switchMap...` or `ranking$.skipUntil(allData$)...` - both should do the same in this case (of course you need the skip for the other two triggers as well) – olsn Nov 27 '16 at 12:19
  • You are God of observables !:D – AlexZvl Nov 27 '16 at 12:42
5

You could use the scan operator to compare the emitted values with any previously emitted values and could include additional data indicating whether or not components of the combined observable have actually changed. For example:

let combined = Observable
  .combineLatest(
    this.tournamentsService.getUpcoming(),
    this.favoriteService.getFavoriteTournaments(),
    this.teamsService.getTeamRanking()
  )
  .scan((acc, values) => [
    ...values,
    acc[0] !== values[0],
    acc[1] !== values[1],
    acc[2] !== values[2]
  ], []);

combined.subscribe(
  ([tournament, favorite, team, tournamentChanged, favoriteChanged, teamChanged]) => {
    console.log(`tournament = ${tournament}; changed = ${tournamentChanged}`);
    console.log(`favorite = ${favorite}; changed = ${favoriteChanged}`);
    console.log(`team = ${team}; changed = ${teamChanged}`);
  }
);
cartant
  • 57,105
  • 17
  • 163
  • 197
4

I arrived to the following solution, a bit similar to what Cartant proposed but using pairwise instead on scan, that seems more elegant. Pairwise operator keeps previously emitted value in buffer and supplies previous value together with newly emitted value to the next operator as an array, therefore you can easily check if the values have changed and pass the results further. In my example I simplified it to just 2 Observables for clarity.

combineLatest([obs1$, obs2$]).pipe(
    pairwise(),
    map(([oldValues, newValues]) => oldValues.map((value, i) => value !== newValues[i])),
    ).subscribe(([obs1$HasChanged, obs2$HasChanged]) => {
)
Yaroslav Larin
  • 209
  • 2
  • 6
1
combineLatest([
    this.obs1.pipe(tap(() => (this.trigger = 'obs1'))),
    this.obs2.pipe(tap(() => (this.trigger = 'obs2'))),
])
.subscribe(([obs1, obs2]) => {
    switch (this.trigger) {
        case 'obs1':
            // ...
        case 'obs2':
            // ...
    }
})
Eugene P.
  • 1,517
  • 12
  • 16
0

A better way to solve this is using a discriminated union type. If your language does not have discriminated unions built in, you can create one by making the constructor private and exposing n nullable public static properties, one for each observable. Writing this in C#, as I'm more comfortable with that language, but it should be easy to translate. Notice the nullable strings. If your language does not support nullables, use some other mechanism to indicate whether a value is set or not.

private class DiscriminatedUnion
{
    private DiscriminatedUnion(string? property1, string? property2)
    {
        Property1 = property1;
        Property2 = property2;
    }

    public string? Property1 { get; }
    public string? Property2 { get; }

    public static DiscrimintatedUnion FromObservable1(string property1)
    {
        return new DiscriminatedUnion(property1, null);
    }

    public static DiscrimintatedUnion FromObservable2(string property2)
    {
        return new DiscriminatedUnion(null, property2);
    }

}

private IObservable<DiscriminatedUnion> CreateCombination()
{
    var firstObservable = tournamentsService
        .getUpcoming()
        .Select(x => DiscriminatedUnion.FromObservable1(x));

    var secondObservable = favoriteService
        .getFavoriteTournaments()
        .Select(x => DiscriminatedUnion.FromObservable2(x));

    return Observable
        CombineLatest(firstObservable, secondObservable);
}

So now you can question the discriminated union returned from CreateCombination() which observable has emitted a value.

Alexander Høst
  • 918
  • 8
  • 16