5

Is there any way/pattern to use combineLatest() or some other operator so that if the combined observables depend on one another, they will only emit only once for each set of paths with the same origin in a DAG? I think it may be easier to explain with a diagram.

Diagram:

   A C
  /| |
 B | |
  \|/
   D

Here B is subscribed to A, and D is subscribed to A, B, and C. The default behavior is that if A emits, D emits twice: once when A emits, and once more when B emits (as a result of A emitting). I would like it to emit only once, after both have emitted. However, if C emits, then D should emit immediately as well.

Here's the code:

const A = new Rx.BehaviorSubject(1);
const B = A.pipe(Rx.map((x) => x + 1));
const C = new Rx.BehaviorSubject(3);

const D = Rx.combineLatest({ A, B, C });

D.subscribe(console.log); // {A: 1, B: 2, C: 3}
A.next(2); // Two emissions: {A: 2, B: 2, C: 3}, {A: 2, B: 3, C: 3} 
// Would like the second one ONLY, i.e. {A: 2, B: 3, C: 3}
C.next(4); // Correctly emits: {A: 2, B: 3, C: 4}

One solution I've tried is to linearize it and make the entire collection of A, B, and C an observable:

 {A, C}
    |
{A, C, B}
    |
   {D}

This works but I wonder if there's perhaps a better way.

Adam B.
  • 788
  • 5
  • 14
  • 1
    If the observable streams aren’t performing any expensive side effects, using [`debounceTime(0)`](https://stackoverflow.com/questions/75074303/how-does-combinelatest-rxjs-operator-works-when-multiple-observables-emit-values/75075776#75075776) can work well as a simple solution. – BizzyBob Jan 17 '23 at 05:17
  • Not knowing whether B is a result of a pipe from A is the real problem here. If you have control over the creation of B, you could use `combineLatestWith` as the last stage of the pipe along with A for example. If you have no control over the creation of B, I don't think what you're looking for is possible without design changes, since I'm not familiar with a way to get the "origin" Observable from another Observable, since essentially, a pipe operator is simply a function. In cases such as these, it's usually better to redesign since it's usually a good indication of a design flaw. – ethanfar Jan 28 '23 at 19:16

2 Answers2

2

If I understand right the question, you want that the final Observable D emits any time either A (the upstream of B) or C emit but, if A emits, you want also the value emitted by B.

If this is the case, I would make sure that any time B emits, it emits not only the value of B but also the value notified by its upstream A, and then I would pass to combineLatest only B and C, like this

const A = new BehaviorSubject(1);
const B = A.pipe(map((x) => [x, x + 1]));
const C = new BehaviorSubject(3);

const D = combineLatest({ B, C }).pipe(
  map((val) => ({ A: val.B[0], B: val.B[1], C: val.C }))
);
Picci
  • 16,775
  • 13
  • 70
  • 113
  • Thanks, yeah that works. However, for it to work, I do need to know that A is the parent of B beforehand. I'm trying to implement this in a situation where observables are added dynamically & I may not know whether an observable is a parent or a child of another one. Any idea what to do in that situation? – Adam B. Jan 17 '23 at 19:02
  • 2
    Downstream, i.e. in the final subscription, you want one notification whenever A emits, no matter how many "derived" Observables/streams you have. If this is the case, rather than creating Observable `B`, you should have a `map` operator attached to Observable `A` that transforms any value notified by Observable `A` into the tuple you are interested in. Which means to create a chain of transformations which starts from a value emitted by A and then is composed down the way. I think a general implementation of this chain could be very complex. Maybe is feasible in more constrained situations. – Picci Jan 17 '23 at 20:45
-1

you can zip A,B, together so for every pair of emissions you get only one in D. Combine that with C which is just one:

const A = new BehaviorSubject(1);
const B = A.pipe(map((x) => [x, x + 1]));
const C = new BehaviorSubject(3);


const D = combineLatest({ zip(A,B), C })

This requires both A & B to emit a value but because those depend on eachother that is not an issue in your example.

https://www.learnrxjs.io/learn-rxjs/operators/combination/zip

Mark van Straten
  • 9,287
  • 3
  • 38
  • 57