When composing hot observables in a class using Observable.Create
, I will follow the pattern of creating a lazily evaluated field which calls a private IObservable
method and then exposing the value of the field as a public property. The aim is to create a single hot observable property for the class and not worry about side effects of the method being called on each invocation.
Here's an example:
public class ExampleLazy {
readonly ICurrencyDependency currencyDependency;
readonly IPricingDependency pricingDependency;
readonly Lazy<IObservable<decimal>> hotCombinedFeed;
public ExampleLazy(
ICurrencyDependency currencyDependency,
IPricingDependency pricingDependency) {
this.currencyDependency = currencyDependency ?? throw new ArgumentNullException(nameof(currencyDependency));
this.pricingDependency = pricingDependency ?? throw new ArgumentNullException(nameof(pricingDependency));
hotCombinedFeed = new Lazy<IObservable<decimal>>(() => CombinedHotFeedGenerator());
}
public IObservable<decimal> HotCombinedFeed => hotCombinedFeed.Value;
IObservable<decimal> CombinedHotFeedGenerator() {
return Observable.Create<decimal>(observer => {
var subscription = currencyDependency.HotCurrencyFeed
.CombineLatest(
pricingDependency.HotPricingFeed)
.SelectMany(async x => {
return await DoSomethingAsync(x);
})
.Subscribe(observer);
return subscription;
})
.Publish()
.RefCount();
}
}
After reading this post and following some links, it sounds like the more conventional approach is to expose the observable publicly using a Subject. Here's what the pattern could look like with this approach:
public class ExampleSubject {
readonly ICurrencyDependency currencyDependency;
readonly IPricingDependency pricingDependency;
readonly ISubject<decimal> hotCombinedFeed = new Subject<decimal>();
public ExampleSubject(
ICurrencyDependency currencyDependency,
IPricingDependency pricingDependency) {
this.currencyDependency = currencyDependency ?? throw new ArgumentNullException(nameof(currencyDependency));
this.pricingDependency = pricingDependency ?? throw new ArgumentNullException(nameof(pricingDependency));
CombinedHotFeedGenerator().Subscribe(hotCombinedFeed);
}
public IObservable<decimal> HotCombinedFeed => hotCombinedFeed.AsObservable();
IObservable<decimal> CombinedHotFeedGenerator() {
return Observable.Create<decimal>(observer => {
var subscription = currencyDependency.HotCurrencyFeed
.CombineLatest(
pricingDependency.HotPricingFeed)
.SelectMany(async x => {
return await DoSomethingAsync(x);
})
.Subscribe(observer);
return subscription;
})
.Publish()
.RefCount();
}
}
Are the two approaches equivalent? Are there any circumstances where I should favour one approach over the other?