3

Trying to understand how the Subject<T>, ReplaySubject<T> and other work. Here is example:

(Subject is Observable and observer)

public IObservable<int> CreateObservable()
{
     Subject<int> subj = new Subject<int>();                // case 1
     ReplaySubject<int> subj = new ReplaySubject<int>();    // case 2

     Random rnd = new Random();
     int maxValue = rnd.Next(20);
     Trace.TraceInformation("Max value is: " + maxValue.ToString());

     subj.OnNext(-1);           // specific value

     for(int iCounter = 0; iCounter < maxValue; iCounter++)
     {
          Trace.TraceInformation("Value: " + iCounter.ToString() + " is about to publish");
          subj.OnNext(iCounter);
      }

      Trace.TraceInformation("Publish complete");
      subj.OnComplete();

      return subj;
 }

 public void Main()
 {
     //
     // First subscription
     CreateObservable()
         .Subscribe(
               onNext: (x)=>{  
                  Trace.TraceInformation("X is: " + x.ToString()); 
      });

     //
     // Second subscribe
     CreateObservable()
         .Subscribe(
               onNext: (x2)=>{  
                  Trace.TraceInformation("X2 is: " + x.ToString());
     });

Case 1: The strange situation is - when I use Subject<T> no subscription is made (???) - I never see the "X is: " text - I only see the "Value is: " and "Max value is"... Why does Subject<T> does not push values to subscription ?

Case 2: If I use ReplaySubject<T> - I do see the values in Subscription but I could not apply Defer option to anything. Not to Subject and not to Observable.... So every subscription will receive different values because CreateObservable function is cold observable. Where is Defer ?

Alex F
  • 3,180
  • 2
  • 28
  • 40

2 Answers2

10

Whenever you need to create an observable out of thin air, Observable.Create should be the first thing to think of. Subjects enter the picture in two cases:

  • You need some kind of "addressable endpoint" to feed data to in order for all subscribers to receive it. Compare this to a .NET event which has both an invocation side (through delegate invocation) and a subscription side (through delegate combine with +- and -= syntax). You'll find in a lot of cases, you can achieve the same effect using Observable.Create.

  • You need multicasting of messages in a query pipeline, effectively sharing an observable sequence by many forks in your query logic, without triggering multiple subscriptions. (Think of subscribing to your favorite magazine once for your dorm and putting a photo copier right behind the letter box. You still pay one subscription, though all of your friends can read the magazine delivered through OnNext on the letter box.)

Also, in a lot of cases, there's already a built-in primitive in Rx that does exactly what you need. For example, there's From* factory methods to bridge with existing concepts (such as events, tasks, asynchronous methods, enumerable sequence), some of which using a subject under the covers. For the second case of multicasting logic, there's the Publish, Replay, etc. family of operators.

Bart De Smet
  • 566
  • 3
  • 3
5

You need to be mindful of when code is executed.

In "Case 1", when you use a Subject<T>, you'll notice that the all of the calls to OnNext & OnCompleted finish before the observable is returned by the CreateObservable method. Since you are using a Subject<T> this means that any subsequent subscription will have missed all of the values so you should expect to get what you got - nothing.

You have to delay the operations on the subject until you have the observer subscribed. To do that using the Create method. Here's how:

public IObservable<int> CreateObservable()
{
    return Observable.Create<int>(o =>
    {
        var subj = new Subject<int>();
        var disposable = subj.Subscribe(o);

        var rnd = new Random();
        var maxValue = rnd.Next(20);
        subj.OnNext(-1);
        for(int iCounter = 0; iCounter < maxValue; iCounter++)
        {
            subj.OnNext(iCounter);
        }
        subj.OnCompleted();

        return disposable;
    });
}

I've removed all the trace code for succinctness.

So now, for every subscriber, you get a new execution of the code inside the Create method and you would now get the values from the internal Subject<T>.

The use of the Create method is generally the correct way to create observables that you return from methods.

Alternatively you could use a ReplaySubject<T> and avoid the use of the Create method. However this is unattractive for a number of reasons. It forces the computation of the entire sequence at creation time. This give you a cold observable that you could have produced more efficiently without using a replay subject.

Now, as an aside, you should try to avoid using subjects at all. The general rule is that if you're using a subject then you're doing something wrong. The CreateObservable method would be better written as this:

public IObservable<int> CreateObservable()
{
    return Observable.Create<int>(o =>
    {
        var rnd = new Random();
        var maxValue = rnd.Next(20);
        return Observable.Range(-1, maxValue + 1).Subscribe(o);
    });
}

No need for a subject at all.

Let me know if this helps.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • var disposable = subj.Subscribe(o); Means: On every `OnNext()` from `o` will raise `OnNext()` in `subj` ??? – Alex F Aug 20 '12 at 08:28
  • No, the other way around. `o` is subscribing to `subj` so every `OnNext()` on `subj` calls the `OnNext` callback on the subscription on `o`. – Enigmativity Aug 20 '12 at 08:40
  • I guess something wrong with my understanding of Reactive.... I thought that Reactive is about 'future collections'. Some logic/commands to run when you don't have the whole collection available. `IObservable` is the _source_ of collection and you could `Subscribe` to `OnNext` event (?) in collection source when something changed and `OnNext` raised. I'm I right or I'm describing EAP (Event based pattern) ? – Alex F Aug 20 '12 at 09:09
  • I'm not exactly sure what you're saying here. The way to think about reactive collections is that they are collections that span a period of time rather than a instantaneous collection (IEnumerable) at a single time. An observable can "cold" (the values are generated when a subscriber attaches) or "hot" (values are produced independently of subscribers). If you have an observable that produces all of its values before the subscriber subscribes then they just "miss" each other. Think of a button being clicked **before** an event handler is attached - you would not get those previous clicks. – Enigmativity Aug 20 '12 at 09:17
  • It's not so much about "future" collections as "co-incidental" collections - when the observable produces values co-incidentally to when an observer is subscribed. – Enigmativity Aug 20 '12 at 09:19
  • Would you say "if you're using a subject then you're doing something wrong" still applies when you're using it as a backing-field for an IObservable property? I often do this instead of using .NET events if I'm going to consume the 'event' via Rx. – Wilka Aug 20 '12 at 19:13
  • @Wilka - I would suggest that observables should generally not be backed by subjects. Errors created by subscribers to your observable property can cause an `OnError` completion of the observable which will then kill the observable for all subscribers. If you use an observable that that wires up to the event directly then subscribers become independent of each other. – Enigmativity Aug 21 '12 at 00:16
  • @Enigmativity thanks for that. I've added a question http://stackoverflow.com/q/12053709/1367 to hopefully get a bit more detail on this (comments are a poor place for the Q/A). I'd appreciated it if you could expand on your answer there. – Wilka Aug 21 '12 at 11:21
  • 1
    @Wilka - I'd be glad to. It'll be in about 10 or so hours from now though. – Enigmativity Aug 21 '12 at 13:46