I have an observable data stream that I am applying operations to, splitting into two separate streams, applying more (distinct) operations to each of the two streams, and merging together again. I am trying to share the observable between two subscribers using Publish
and Connect
but each of the subscribers seems to be using a separate stream. That is, in the example below, I see "Doing an expensive operation" printed once for each item in the stream for both of the subscribers. (Imagine the expensive operation as being something that should happen only once between all subscribers, as such I am trying to reuse the stream.) I have used Publish
and Connect
to try and share the merged observable with both subscribers, but it seems to have the wrong effect.
Example with the issue:
var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
// Converting to strings is an expensive operation
Console.WriteLine("Doing an expensive operation");
return string.Format("#{0}", i);
});
var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });
var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
I see the following output:
Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }
(Output continues, truncated for brevity.)
How can I share the observable with both subscribers?