0

When composing hot observables in a class using Observable.Create, I will follow the pattern of creating a lazily evaluated field which calls a private IObservable method and then exposing the value of the field as a public property. The aim is to create a single hot observable property for the class and not worry about side effects of the method being called on each invocation.

Here's an example:

public class ExampleLazy {

  readonly ICurrencyDependency currencyDependency;
  readonly IPricingDependency pricingDependency;  

  readonly Lazy<IObservable<decimal>> hotCombinedFeed;

  public ExampleLazy(
    ICurrencyDependency currencyDependency,
    IPricingDependency pricingDependency) {

    this.currencyDependency = currencyDependency ?? throw new ArgumentNullException(nameof(currencyDependency));
    this.pricingDependency = pricingDependency ?? throw new ArgumentNullException(nameof(pricingDependency));

    hotCombinedFeed = new Lazy<IObservable<decimal>>(() => CombinedHotFeedGenerator());   
  }

  public IObservable<decimal> HotCombinedFeed => hotCombinedFeed.Value;

  IObservable<decimal> CombinedHotFeedGenerator() {
    return Observable.Create<decimal>(observer => {
      var subscription = currencyDependency.HotCurrencyFeed
        .CombineLatest(
          pricingDependency.HotPricingFeed)
        .SelectMany(async x => {
          return await DoSomethingAsync(x);
        })
        .Subscribe(observer);

      return subscription;
    })
    .Publish()
    .RefCount();
  }
}

After reading this post and following some links, it sounds like the more conventional approach is to expose the observable publicly using a Subject. Here's what the pattern could look like with this approach:

public class ExampleSubject {

  readonly ICurrencyDependency currencyDependency;
  readonly IPricingDependency pricingDependency;

  readonly ISubject<decimal> hotCombinedFeed = new Subject<decimal>();

  public ExampleSubject(
    ICurrencyDependency currencyDependency,
    IPricingDependency pricingDependency) {

    this.currencyDependency = currencyDependency ?? throw new ArgumentNullException(nameof(currencyDependency));
    this.pricingDependency = pricingDependency ?? throw new ArgumentNullException(nameof(pricingDependency));

    CombinedHotFeedGenerator().Subscribe(hotCombinedFeed);   
  }

  public IObservable<decimal> HotCombinedFeed => hotCombinedFeed.AsObservable();

  IObservable<decimal> CombinedHotFeedGenerator() {
    return Observable.Create<decimal>(observer => {
      var subscription = currencyDependency.HotCurrencyFeed
        .CombineLatest(
          pricingDependency.HotPricingFeed)
        .SelectMany(async x => {
          return await DoSomethingAsync(x);
        })
        .Subscribe(observer);

      return subscription;
    })
    .Publish()
    .RefCount();
  }
}

Are the two approaches equivalent? Are there any circumstances where I should favour one approach over the other?

Martin
  • 1
  • 1
  • An observable sequence is called "hot" if it starts without any subscribed observers, and continues after the last observer unsubscribes. That's not what the `Observable.Create` does. You can see it [here](http://introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#ColdObservables) used as an example of creating **cold** sequences. It's debatable whether chaining `.Publish().RefCount()` at the end makes it hot either. So my suggestion is to describe your problem in more specific terms than the confusing hot/cold. What's the desirable behavior of the `HotCombinedFeed` property? – Theodor Zoulias May 13 '22 at 19:38
  • Fair point, the mention of hot/cold may be a red herring. The desired behaviour is exactly what I have with `ExampleLazy`, every subscriber to the `HotCombinedFeed` property will get the same observable sequence. If I expose the `CombinedHotFeedGenerator` method publicly, each subscriber will trigger their own `DoSomethingAsync`. What's the best practice to ensure a property subscribed to returns the same observable sequence when the sequence is generated using `Observable.Create`? – Martin May 13 '22 at 20:49
  • *"every subscriber to the `HotCombinedFeed` property will get the same observable sequence."* -- The `HotCombinedFeed` *is* an observable sequence. Do you mean that every subscriber should get the same notifications? What if a notification X occurred after the subscription A but before the subscription B? Would you like the B subscriber to receive the X notification as well upon subscription? – Theodor Zoulias May 13 '22 at 21:06
  • Also *when* do you want to create the initial underlying subscription to the `Observable.Create`, the one that all future subscribers will share? At the start of the program? When the `HotCombinedFeed` property is first requested? When the `HotCombinedFeed` sequence is first subscribed? – Theodor Zoulias May 13 '22 at 21:06

0 Answers0