6

I have an observable data stream that I am applying operations to, splitting into two separate streams, applying more (distinct) operations to each of the two streams, and merging together again. I am trying to share the observable between two subscribers using Publish and Connect but each of the subscribers seems to be using a separate stream. That is, in the example below, I see "Doing an expensive operation" printed once for each item in the stream for both of the subscribers. (Imagine the expensive operation as being something that should happen only once between all subscribers, as such I am trying to reuse the stream.) I have used Publish and Connect to try and share the merged observable with both subscribers, but it seems to have the wrong effect.

Example with the issue:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();

I see the following output:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }

(Output continues, truncated for brevity.)

How can I share the observable with both subscribers?

Whymarrh
  • 13,139
  • 14
  • 57
  • 108

2 Answers2

22

You have published the wrong observable.

With the current code you are merging and then publishing like this Observable.Merge(a, b).Publish();. Now since a & b are defined against expensive you still get two subscriptions to expensive.

The subscriptions create these pipelines:

Original

You can see this if you take out the .Publish(); from your code. The output becomes:

Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }

This creates these pipelines:

No Publish

So, by shifting the .Publish() back up to expensive you eliminate the problem. That's where you really needed it because it is the expensive operation after all.

This is the code you needed:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var connectable = expensive.Publish();

var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);

merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

connectable.Connect();

That nicely produces the following:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }

And this gives you these pipelines:

Expensive Publish

You can see from this image that there is still duplication. That's fine because these parts aren't expensive.

The duplication is actually important. Shared parts of the pipelines make their endpoints vulnerable to errors and thus to early termination. The less sharing the better for the robustness of the code. It's only when you have an expensive operation that you should worry about publishing. Otherwise you should just let the pipelines be themselves.

Here's an example to show it. If you don't have a published source then, if one source produces an error then it doesn't pull down all of the pipelines.

Separate

But once you introduce a shared observable then a single error will bring down all of the pipelines.

Shared

Whymarrh
  • 13,139
  • 14
  • 57
  • 108
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Could you elaborate on "since a & b are defined against expensive you still get two subscriptions to expensive"? I don't understand that part exactly. – Whymarrh Aug 28 '15 at 09:47
  • @Whymarrh - I've added further explanation. – Enigmativity Aug 28 '15 at 11:33
  • "The duplication is actually important. Shared parts of the pipelines make their endpoints vulnerable to errors and thus to early termination. The less sharing the better for the robustness of the code." I don't understand this. Could you explain more? – Timothy Shields Aug 28 '15 at 13:55
  • @TimothyShields - I've added some more detail. – Enigmativity Aug 29 '15 at 02:03
  • @Enigmativity: I don't understand your statement about propagation of error along the pipeline. Does it mean that errors are propagated also backwards, i.e. if an error happen in the middle of a particular path of your graph, all nodes on that path will be terminated?? I was thinking error would only propagate forwards i.e. the error generating stream and subsequent streams would terminate. – user3743222 Sep 06 '15 at 23:51
  • @user3743222 - This is where you need to understand the difference between the query definition and the query execution. When we code something like `var o = Observable.Range(0, 10).Where(n => n % 2 == 0);` the variable `o` represents the definition of the observable. When we do this `var s1 = o.Subscribe(...)` we instantiate the pipeline. This is the execution of the observable and a new set of objects are created. When we do `var s2 = o.Subscribe(...)` another set of execution objects are created. When an error occurs all of the objects in the set of execution objects terminate. – Enigmativity Sep 07 '15 at 02:32
  • @Enigmativity - I do understand the difference between the two. I still don't understand why the error is propagated all the way back to the source. Take a mouse click source (hot `shared` stream). If there is an error at that level, all subscribers are in error. Fair enough. Map that click to a number. If your map function produces an error, that does not change the fact the mouse click source can still emit values. So if there any another observer, it should receive those mouse cliks. And both observers are sharing the same source. That contradicts your last diagram. Where did I go wrong? – user3743222 Sep 07 '15 at 13:38
  • @Enigmativity : does it make sense that I create another question? I think this is of interest for many people, and an answer might take more than what is allowed in the comment field – user3743222 Sep 07 '15 at 13:39
  • @user3743222 - sorry, I wasn't clear enough. Errors cause upstream observables to unsubscribe - not propagating the error as such. But two subscriptions to a mouse click observable create two separate observable pipelines, so only one is affected. – Enigmativity Sep 07 '15 at 22:25
  • @Enigmativity What did you use to create those awesome diagrams? – Matt Klein Oct 03 '16 at 16:04
  • @MattKlein - Most likely Visio. It was a year ago, so I can't recall for sure. – Enigmativity Oct 03 '16 at 21:12
3

One possible fix:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var subj = new ReplaySubject<string>();
expensive.Subscribe(subj);

var a = subj.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = subj.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

The above example essentially creates a new intermediate observable that emits the results of the expensive operation. This allows you to subscribe to the results of the expensive operation, not to an expensive transformation applied to a timer.

With this you'll see:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }

(Output continues, truncated for brevity.)

Alternatively, you could move the calls to Publish and Connect:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) {IsBackground = false});
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
}).Publish();

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);
merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

expensive.Connect();

Why ReplaySubject, not just Subject or some other subject?

A Subject, in the .NET Rx implementation is by default what the ReactiveX documentation calls a PublishSubject, which emits to an observer only those items that are emitted by the source Observable subsequent to the time of the subscription. A ReplaySubject on the other hand, emits to any observer all of the items that were emitted by the source Observable, regardless of when the observer subscribes. If we use a plain subject in the first example, the subscription of subj to the timer will cause subscriptions to subj to miss anything emitted between the time that the subject subscribes to the expensive operation and the time that they subscribe to the intermediate subject (subj).

Whymarrh
  • 13,139
  • 14
  • 57
  • 108