2

I have the following observable exposed by an object through property.

IObservable<HumidityLevel> humidity;

But the above observable is not created until after a method of that object is called. But subscribers have to subscribe at the time of construction of that object that is exposing the above observable.

One way I could think of is to create an empty observable so subscribers can subscribe

 IObservable<HumidityLevel> humidity = Observable.Empty<HumidityLevel>();

And later on in the object's lifetime when the actual observable is ready, merge that into the above existing ones.

 humidity = humidity.Concat(actualHumidityObservable)

Now the above line obviously modifies the humidity reference that the Subscribers are not subscribed to so they will never hear from this object.

How do I achieve what I am trying to do ? Is there any extension in Rx that can Merge into an existing observable so subscribers are preserved ?

fahadash
  • 3,133
  • 1
  • 30
  • 59

1 Answers1

3

Subjects are the key here, they act as both observers and observables. As well as manually invoking events on a subject via it's On... methods, you can also just subscribe them to a source observable directly - either before or after the subjects own subscribers subscribe.

Here's one way you might do this:

public class HumidityLevel
{
    public int Value;
}

public class Monitor
{
    private Subject<HumidityLevel> _humidityLevel =
        new Subject<HumidityLevel>();

    public IObservable<HumidityLevel> HumidityLevel
    {
        get
        {
            return _humidityLevel.AsObservable();
        }
    }    

    public void StartMonitoring(IObservable<HumidityLevel> source)
    {
        source.Subscribe(_humidityLevel);
    }
}

Note the use of AsObservable() in the getter - this is a means to protect the origin of the observable as an underlying Subject and thus from being abused!

Then use it like this:

var monitor = new Monitor();

// a subscriber is free to subscribe at any time
var subscriber = monitor.HumidityLevel
    .Subscribe(level => Console.WriteLine(level.Value));

// now create an example source
var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(i => new HumidityLevel { Value = (int)i });

// and pass it to monitor and the subscriber will start getting events
monitor.StartMonitoring(source);

Note I am not doing any clean up here, like unsubscribing from the source when you are done with it. Whether you need to do this, and how you do it will be dependent on your scenario. It may not matter, or you might want to track the subscription and release it on reassignment of a fresh source. In this case you might be interested in storing the subscription to the source in a SerialDisposable. There are lot of options here.

James World
  • 29,019
  • 9
  • 86
  • 120
  • Weren't Subjects not recommended ? I remember reading it somewhere – fahadash Jul 07 '14 at 02:20
  • This is a common misconception due to an unfortunately over zealous statement on the *incorrect usage* of subjects that was very context dependent. Sadly, the context doesn't seem to travel with the catchphrase. Suffice to say that in this case, a subject is just fine. See all of [this discussion](http://stackoverflow.com/questions/14396449/why-are-subjects-not-recommended-in-net-reactive-extensions) for more. – James World Jul 07 '14 at 04:59