2

I have an API which exposes an IObservable Status. But this status depends on an underlying observable source which has to be initialised via Init.

What I'd like to do is protect the users from having to do things in the right order: as it currently stands, if they try to subscribe to the Status before performing an Init, they get an exception because they source is not initialised.

So I had the genius idea of using a Subject to decouple the two: the external user subscribing to my Status is just subscribing to the Subject, then when they call Init, I subscribe to the underlying service using my Subject.

The idea in code

private ISubject<bool> _StatusSubject = new Subject<bool>();
public IObservable<bool> Status { get { return _StatusSubject; } }

public void Init() 
{
    _Connection = new Connection();
    Underlying.GetDeferredObservable(_Connection).Subscribe(_StatusSubject);
}

However, from tests on a dummy project, the problem is that the initialisation 'wakes up' my underlying Observable by subscribing the Subject to it, even if nobody has yet subscribed to the subject. That's something I'd like to avoid if possible, but I'm not sure how...

(I'm also mindful of the received wisdom that "the general rule is that if you're using a subject then you're doing something wrong")

Community
  • 1
  • 1
Benjol
  • 63,995
  • 54
  • 186
  • 268
  • for what it's worth, "stateful" observable streams and `Subject` go hand-in-hand in my mind; yeah, it's frowned up generally, but generally you try to set up a state-free stream. If the concept of state can't be abstracted away from the stream mechanics (like requiring a specific bootstrap/initialization), I say go with `Subject`; it's *way* easier to understand and work with. – JerKimball May 07 '13 at 16:00
  • 1
    In my mind, the primary reason to avoid `Subject` is that usually people reach for it first and end up with a complex solution (like the @Benjol's solution below) without realizing that they are usually re-implementing something `RX` already provides as an operator. The `Subject` should be put near the end of the list of things to try. But once you find you can't do it more simply some other way, then by all means use a `Subject`! – Brandon May 07 '13 at 23:24
  • @brandon That's a fair point; I typically will prototype stuff up with subject and then work backwards, decomposing them into straight up streams. Good answer, btw – JerKimball May 08 '13 at 14:14

3 Answers3

7

It seems like the concept you are missing is how to know when someone starts listening and only init your underlying source. Usually you use Observable.Create or one of its siblings (Defer, Using, ...) to do this.

Here's how to do it without a Subject:

private IObservable<bool> _status = Observable.Defer(() =>
{
    _Connection = new Connection();
    return Underlying.GetDeferredObservable(_Connection);
};

public IObservable<bool> Status { get { return _status; } }

Defer will not call the init code until someone actually subscribes.

But this has a couple of potential issues:

  1. Each observer will make a new connection
  2. When the observer unsubscribes, the connection is not cleaned up.

The 2nd issue is easy to solve, so let's do that first. Let's assume your Connection is disposable, in which case you can just do:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection));

public IObservable<bool> Status { get { return _status; } }

With this iteration, whenever someone subscribes, a new Connection is created and passed to the 2nd lamba method to construct the observable. Whenever the observer unsubscribes, the Connection is Disposed. If Connection is not a IDisposable, then you can use Disposable.Create(Action) to create an IDisposable which will run whatever action you need to run to cleanup the connection.

You still have the problem that each observer creates a new connection. We can use Publish and RefCount to solve that problem:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection))
    .Publish()
    .RefCount();

public IObservable<bool> Status { get { return _status; } }

Now, when the first observer subscribes, the connection will get created and the underlying observable will be subscribed. Subsequent observers will share the connection and will pick up the current status. When the last observer unsubscribes, the connection will be disposed and everything shut down. If another observer subscribes after that, it all starts back up again.

Underneath the hood, Publish is actually using a Subject to share the single observable source. And RefCount is tracking how many observers are currently observing.

Brandon
  • 38,310
  • 8
  • 82
  • 87
  • Thanks for taking the time to answer. The other thing I need to be able to do is *not* subscribe to the underlying even *after* others have subscribed to Status: that should only happen on 'Init'... – Benjol May 08 '13 at 06:23
  • Maybe I'm misunderstanding you, but that's how the final example works. `Publish` returns a `ConnectableObservable` that only subscribes to the underlying source when you call `Connect` on it. So others can subscribe to it but it will wait until you call `Connect` before it subscribes to source. `RefCount` manages that for you. The first subscriber will cause `RefCount` to call `Connect`. The 2nd+ subscribers will cause `RefCount` to share that same subscription. When the subscribers all unsubscribe, `RefCount` will `Disconnect` from the source. – Brandon May 08 '13 at 10:01
  • IMO this is a much neater solution than using a Subject. It's more idiomatic, more declarative and less stateful (in your own code at least). As @Brandon says, using RefCount will ensure that the underlying connection is only subscribed to the first time. – Niall Connaughton May 09 '13 at 07:36
1

I might be oversimplifying here, but let me take a whack at using Subject as requested:

Your Thingy:

public class Thingy
{
    private BehaviorSubject<bool> _statusSubject = new BehaviorSubject<bool>(false);    
    public IObservable<bool> Status
    {
        get
        {
            return _statusSubject;
        }
    }

    public void Init()
    {
        var c = new object();
        new Underlying().GetDeferredObservable(c).Subscribe(_statusSubject);
    }
}

A faked out Underlying:

public class Underlying
{
    public IObservable<bool> GetDeferredObservable(object connection)
    {
        return Observable.DeferAsync<bool>(token => {
            return Task.Factory.StartNew(() => {
                Console.WriteLine("UNDERLYING ENGAGED");
                Thread.Sleep(1000);
                // Let's pretend there's some static on the line...
                return Observable.Return(true)
                    .Concat(Observable.Return(false))
                    .Concat(Observable.Return(true));
            }, token);
        });
    }
}

The harness:

void Main()
{
    var thingy = new Thingy();
    using(thingy.Status.Subscribe(stat => Console.WriteLine("Status:{0}", stat)))
    {
        Console.WriteLine("Waiting three seconds to Init...");
        Thread.Sleep(3000);
        thingy.Init();
        Console.ReadLine();
    }
}

The output:

Status:False
Waiting three seconds to Init...
UNDERLYING ENGAGED
Status:True
Status:False
Status:True
JerKimball
  • 16,584
  • 3
  • 43
  • 55
  • Thanks. The only functional difference with what I'm trying to do (but which I admit I didn't state in the question) is that when all subscribers have left, the subject needs to be unsubscribed from the underlying... BTW, where does DeferAsync come from? – Benjol May 08 '13 at 06:13
  • Ah, yeah, I'd do something like Brandon then, with a RefCounted connectable. I think DeferAsync is in the Rx platform services assembly, but I'm going from memory. – JerKimball May 08 '13 at 14:15
  • It seems to be in System.Reactive.Linq assembly. If you don't need the `CancellationToken` (which in this faked out example seems to be the case), there's also a `Defer` overload which takes an async factory method. I'm actually surprised `DeferAsync` isn't just another overload of `Defer`. – Brandon May 09 '13 at 14:03
  • @brandon yeah, I just threw this together in linqpad for expediency - concur on the overload, there are I think three "Async variants" that fall in the same category. Different teams, I'd guess? – JerKimball May 09 '13 at 17:05
0

Hm, having played with this, I don't think that I can do it just with a Subject.

Not yet finished testing/trying, but here's what I've currently come up with which seems to work, but it doesn't protect me from the problems with Subject, as I'm still using one internally.

public class ObservableRouter<T> : IObservable<T>
{
    ISubject<T> _Subject = new Subject<T>();
    Dictionary<IObserver<T>, IDisposable> _ObserverSubscriptions 
                               = new Dictionary<IObserver<T>, IDisposable>();
    IObservable<T> _ObservableSource;
    IDisposable _SourceSubscription;

    //Note that this can happen before or after SetSource
    public IDisposable Subscribe(IObserver<T> observer)
    {
        _ObserverSubscriptions.Add(observer, _Subject.Subscribe(observer));
        IfReadySubscribeToSource();
        return Disposable.Create(() => UnsubscribeObserver(observer));
    }

    //Note that this can happen before or after Subscribe
    public void SetSource(IObservable<T> observable)
    {
        if(_ObserverSubscriptions.Count > 0 && _ObservableSource != null) 
                  throw new InvalidOperationException("Already routed!");
        _ObservableSource = observable;
        IfReadySubscribeToSource();
    }

    private void IfReadySubscribeToSource()
    {
        if(_SourceSubscription == null &&
           _ObservableSource != null && 
           _ObserverSubscriptions.Count > 0)
        {
            _SourceSubscription = _ObservableSource.Subscribe(_Subject);
        }
    }

    private void UnsubscribeObserver(IObserver<T> observer)
    {
        _ObserverSubscriptions[observer].Dispose();
        _ObserverSubscriptions.Remove(observer);
        if(_ObserverSubscriptions.Count == 0)
        {
            _SourceSubscription.Dispose();
            _SourceSubscription = null;
        }
    }
}
Benjol
  • 63,995
  • 54
  • 186
  • 268