I'm trying to use observables for the first time, with System.Reactive
.
I wish to push the results of 3 features as they are produced to a same ReportBuilder
that would then notify the caller when it is done so the caller can pass the resulting Report
to another service.
I am facing a few issues :
If I have an observer implementing IObserver of multiple types, OnCompleted method would be called for the first observed sequence to finish, isnt it ? How can I observe multiple sequence of different types until all are finished ?
After registering my subscriptions, does the execution flow returns to caller object or is it only after OnCompleted is called ? This is not too clear to me.
Since the builder is both an observable and an observer, my implementation seems awkward to me, what is the proper way to implement an object that is both an observable and an observer ?
Should I merge my 3 observables and make the features async to consume the result as they are produced ? If so, how, since my collections are of different types ? Besides I would consume a sequence of three task results so I would still consume one feature after another, isnt it ?
Here is an attempt to put my ideas to code, but besides the mentioned issues the implementation doesn't seem to have too much benefits compared to straightforward calls and deferred execution.
public interface IFeatureC
{
IEnumerable<ModelC> ExecuteFeature();
}
public interface IFeatureB
{
IEnumerable<ModelB> ExecuteFeature();
}
public interface IFeatureA
{
IEnumerable<ModelA> ExecuteFeature();
}
public class ModelA { }
public class ModelB { }
public class ModelC { }
public class Report
{
public Report()
{
ReportElements = new List<ReportElement>();
}
public List<ReportElement> ReportElements { get; }
}
public class ReportElement
{
public ReportElement(ModelA item)
{
// DoSomething
}
public ReportElement(ModelB item)
{
// DoSomething
}
public ReportElement(ModelC item)
{
// DoSomething
}
}
public interface IOtherService
{
void DoSomething(Report report);
}
public class Orchestrator : IObserver<Report>
{
private readonly ReportBuilder _builder;
private readonly IOtherService _otherService;
private readonly IFeatureA _featureA;
private readonly IFeatureB _featureB;
private readonly IFeatureC _featureC;
private readonly ILogger _logger
public Orchestrator(IFeatureA featureA, IFeatureB featureB, IFeatureC featureC, ReportBuilder builder, IOtherService otherService, ILogger logger)
{
_featureA = featureA;
_featureB = featureB;
_featureC = featureC;
_builder = builder;
_otherService = otherService;
_logger = logger;
}
public void Run()
{
IObservable<ModelA> resultAObservable = _featureA.ExecuteFeature().ToObservable();
IObservable<ModelB> resultBObservable = _featureB.ExecuteFeature().ToObservable();
IObservable<ModelC> resultCObservable = _featureC.ExecuteFeature().ToObservable();
resultAObservable.Subscribe(_builder);
resultBObservable.Subscribe(_builder);
resultCObservable.Subscribe(_builder);
// Do I need to wait until OnCompleted is called ?
}
public void OnCompleted()
{
_logger.Information("Report pushed");
}
public void OnError(Exception error)
{
logger.Error(error);
throw error;
}
public void OnNext(Report report)
{
_otherService.DoSomething(report);
}
}
public class ReportBuilder : IObserver<ModelA>, IObserver<ModelB>, IObserver<ModelC>
{
private readonly IObserver<Report> _reportObserver;
private Report Report { get; } = new Report();
private readonly IObservable<Report> _observableReport;
public ReportBuilder(IObserver<Report> reportObserver) // Is there a better than passing the caller ?
{
_reportObserver = reportObserver;
_observableReport = Observable.Return(Report);
}
public void OnCompleted() // This will get called by the first finished sequence and I will miss some observed elements ?
{
_observableReport.Subscribe(_reportObserver);
}
public void OnError(Exception error)
{
throw error;
}
public void OnNext(ModelA value)
{
Report.ReportElements.Add(new ReportElement(value));
}
public void OnNext(ModelB value)
{
Report.ReportElements.Add(new ReportElement(value));
}
public void OnNext(ModelC value)
{
Report.ReportElements.Add(new ReportElement(value));
}
}