0

I'm trying to use observables for the first time, with System.Reactive.

I wish to push the results of 3 features as they are produced to a same ReportBuilder that would then notify the caller when it is done so the caller can pass the resulting Report to another service.

I am facing a few issues :

  1. If I have an observer implementing IObserver of multiple types, OnCompleted method would be called for the first observed sequence to finish, isnt it ? How can I observe multiple sequence of different types until all are finished ?

  2. After registering my subscriptions, does the execution flow returns to caller object or is it only after OnCompleted is called ? This is not too clear to me.

  3. Since the builder is both an observable and an observer, my implementation seems awkward to me, what is the proper way to implement an object that is both an observable and an observer ?

  4. Should I merge my 3 observables and make the features async to consume the result as they are produced ? If so, how, since my collections are of different types ? Besides I would consume a sequence of three task results so I would still consume one feature after another, isnt it ?

Here is an attempt to put my ideas to code, but besides the mentioned issues the implementation doesn't seem to have too much benefits compared to straightforward calls and deferred execution.

public interface IFeatureC
{
    IEnumerable<ModelC> ExecuteFeature();
}

public interface IFeatureB
{
    IEnumerable<ModelB> ExecuteFeature();
}

public interface IFeatureA
{
    IEnumerable<ModelA> ExecuteFeature();
}


public class ModelA { } 
public class ModelB { } 
public class ModelC { }


public class Report
{
    public Report()
    {
        ReportElements = new List<ReportElement>();
    }
    public List<ReportElement> ReportElements { get; }
}



public class ReportElement
{
    public ReportElement(ModelA item)
    {
        // DoSomething
    }

    public ReportElement(ModelB item)
    {
        // DoSomething
    }

    public ReportElement(ModelC item)
    {
        // DoSomething
    }
}

public interface IOtherService
{
    void DoSomething(Report report);
}



public class Orchestrator : IObserver<Report>
{
    private readonly ReportBuilder _builder;
    private readonly IOtherService _otherService;
    private readonly IFeatureA _featureA;
    private readonly IFeatureB _featureB;
    private readonly IFeatureC _featureC;
    private readonly ILogger _logger

    public Orchestrator(IFeatureA featureA, IFeatureB featureB, IFeatureC featureC, ReportBuilder builder, IOtherService otherService, ILogger logger)
    {
        _featureA = featureA;
        _featureB = featureB;
        _featureC = featureC;
        _builder = builder;
        _otherService = otherService;
        _logger = logger;
    }

    public void Run()
    {
        IObservable<ModelA> resultAObservable = _featureA.ExecuteFeature().ToObservable();  
        IObservable<ModelB> resultBObservable = _featureB.ExecuteFeature().ToObservable();
        IObservable<ModelC> resultCObservable = _featureC.ExecuteFeature().ToObservable();

        resultAObservable.Subscribe(_builder);
        resultBObservable.Subscribe(_builder);
        resultCObservable.Subscribe(_builder);

        // Do I need to wait until OnCompleted is called ?
    }

    public void OnCompleted()
    {
        _logger.Information("Report pushed");
    }

    public void OnError(Exception error)
    {
        logger.Error(error);
        throw error;
    }

    public void OnNext(Report report)
    {
        _otherService.DoSomething(report);
    }
}

public class ReportBuilder : IObserver<ModelA>, IObserver<ModelB>, IObserver<ModelC>
{
    private readonly IObserver<Report> _reportObserver;
    private Report Report { get; } = new Report();
    private readonly IObservable<Report> _observableReport;

    public ReportBuilder(IObserver<Report> reportObserver)   // Is there a better than passing the caller ?
    {
        _reportObserver = reportObserver;
        _observableReport = Observable.Return(Report);  
    }
    public void OnCompleted()  // This will get called by the first finished sequence and I will miss some observed elements ?
    {
        _observableReport.Subscribe(_reportObserver);
    }

    public void OnError(Exception error)
    {
        throw error;
    }

    public void OnNext(ModelA value)
    {
        Report.ReportElements.Add(new ReportElement(value));
    }

    public void OnNext(ModelB value)
    {
        Report.ReportElements.Add(new ReportElement(value));
    }

    public void OnNext(ModelC value)
    {
        Report.ReportElements.Add(new ReportElement(value));
    }
}
Calimero
  • 161
  • 1
  • 8
  • I suggest you go through introtorx.com. Stack overflow is meant for specific questions with at least a minimal knowledge of the technology. – Shlomo May 05 '21 at 23:29

1 Answers1

1

If I have an observer implementing IObserver of multiple types, OnCompleted method would be called for the first observed sequence to finish, isnt it ?

It is very rare to implement IObservable<T> or IObserver<T> - it is usually best to just use the existing operators and types. It's very hard to get this stuff right if you roll your own - as you've discovered.

How can I observe multiple sequence of different types until all are finished ?

Don't implement a class like this.

After registering my subscriptions, does the execution flow returns to caller object or is it only after OnCompleted is called ? This is not too clear to me.

Execution generally returns to the caller, but that depends on the observable. Observable.Return(42).Subscribe(x => Console.WriteLine(x)), for example, executes on the current thread so it will complete the observable before returning to the caller. This question will help: What are the default Schedulers for each observable operator?

Since the builder is both an observable and an observer, my implementation seems awkward to me, what is the proper way to implement an object that is both an observable and an observer ?

You don't implement either.

Should I merge my 3 observables and make the features async to consume the result as they are produced ?

Yes and no. Merge, but don't worry about async. Observables are fine.

If so, how, since my collections are of different types ?

Make them the one type by using Select.

Besides I would consume a sequence of three task results so I would still consume one feature after another, isnt it ?

No. You can do all at once.

Here's how you should implement all of your code:

public IObservable<Report> Run(IFeatureA featureA, IFeatureB featureB, IFeatureC featureC) =>
    Observable
        .Merge(
            featureA.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)),
            featureB.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)),
            featureC.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)))
        .ToArray()
        .Select(xs =>
        {
            var report = new Report();
            report.ReportElements.AddRange(xs);
            return report;
        });

That's it.


Alternatively you could make it synchronous or async without losing the benefit of Rx:

public Report Run(IFeatureA featureA, IFeatureB featureB, IFeatureC featureC) =>
    Observable
        .Merge(
            featureA.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)),
            featureB.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)),
            featureC.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)))
        .ToArray()
        .Select(xs =>
        {
            var report = new Report();
            report.ReportElements.AddRange(xs);
            return report;
        })
        .Wait();

public async Task<Report> RunAsync(IFeatureA featureA, IFeatureB featureB, IFeatureC featureC) =>
    await Observable
        .Merge(
            featureA.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)),
            featureB.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)),
            featureC.ExecuteFeature().ToObservable().Select(x => new ReportElement(x)))
        .ToArray()
        .Select(xs =>
        {
            var report = new Report();
            report.ReportElements.AddRange(xs);
            return report;
        });
Enigmativity
  • 113,464
  • 11
  • 89
  • 172