0

I would like to merge two IProgress<T> objects into one IProgress<T> where T is a number. It should always return the average of all merged progresses. So for example progress1 = 50% and progress2 = 70% to mergedProgress = 60%.

This could possibly look like this, but so far I have not found an approach to merge two or more IProgress objects:

IProgress<float> p1 = new Progress(value=> Console.WriteLine(value));

IProgress<float> p2 = new Progress(value=> Console.WriteLine(value));

IProgress<float> pM = Progress.Merge(p1,p2,value=> Console.WriteLine("Full progress: " + value));

So far I have only found one way around which I have to go when creating the Progress objects to output the passage in another Progress. I have stored the returned values in an array and then calculate the average manually, because the method ".Average()" does not work, because several threads would access the array:

float[] progress = new float[2]

IProgress<float> pM = new Progress(value=>
{  
   double p=0;
   for (int i = 0; i < progress.Length; i++)
   {
      p+=progress[i]
   }
   Console.WriteLine("Full progress: " + p/progress.Length)
});

IProgress<float> p1 = new Progress(value=>{ progress[0]=value; pM.Report(0); });

IProgress<float> p2 = new Progress(value=>{ progress[1]=value; pM.Report(0); });

Is there a way to merge the Progress<float> objects?

formeo
  • 49
  • 5
  • 1
    `IProgress` is a callback interface, not a stream or container of events. If you want to throttle or average notifications you can use Reactive Extensions or publish them to a Channel, and use either a subscriber or System.Linq.Async over IAsyncEnumerable to consume the averages – Panagiotis Kanavos Mar 30 '23 at 15:15
  • As for `several threads would access the array:` that's not true. The callback is executed in the thread that created the `Progress` object, typically the UI thread. That's the whole point of `IProgress`: background threads report events to it and the callback runs on the original thread – Panagiotis Kanavos Mar 30 '23 at 15:16

1 Answers1

1

You could write your own amalgamator:

public sealed class ProgressAmalgamator : Progress<double>
{
    public void Attach(Progress<double> progress)
    {
        lock (_lock)
        {
            _progressors.Add(progress);
            _progress.Add(0);

            progress.ProgressChanged += progress_ProgressChanged;
        }
    }

    void progress_ProgressChanged(object? sender, double e)
    {
        double average = 0;

        lock (_lock)
        {
            for (int i = 0; i < _progressors.Count; i++)
            {
                if (ReferenceEquals(_progressors[i], sender))
                {
                    _progress[i] = e;
                    break;
                }
            }

            average = _progress.Average();
        }

        OnReport(average);
    }

    readonly List<IProgress<double>> _progressors = new();
    readonly List<double> _progress = new();
    readonly object _lock = new object();
}

This assumes that all the progress sources are using the same scaling - if they don't, the calculated average will of course be incorrect.

Sample console app using it (note the addition of a Console.WriteLine() to progress_ProgressChanged() that you don't want in production code):

namespace Console1;

public static class Program
{
    public static async Task Main()
    {
        var progress1 = new Progress<double>();
        var progress2 = new Progress<double>();
        var progress3 = new Progress<double>();

        var amalgamator = new ProgressAmalgamator();

        amalgamator.Attach(progress1);
        amalgamator.Attach(progress2);
        amalgamator.Attach(progress3);

        amalgamator.ProgressChanged += (_, progress) => Console.WriteLine($"Amalgamated progress = {progress}");

        Task[] tasks =
        {
            Task.Run(() => simulateProgress("A", progress1, 10, 1000)),
            Task.Run(() => simulateProgress("B", progress2, 20, 2000)),
            Task.Run(() => simulateProgress("C", progress3, 50, 5000))
        };

        await Task.WhenAll(tasks);

        Console.WriteLine("All tasks completed.");
        Console.ReadLine();
    }

    static void simulateProgress(string name, IProgress<double> progress, double step, int delay)
    {
        double current = 0;

        while (current < 100)
        {
            Thread.Sleep(delay);
            current += step;
            Console.WriteLine($"Thread {name} is reporting progress = {current}");
            progress.Report(Math.Min(current, 100));
        }
    }
}

public sealed class ProgressAmalgamator : Progress<double>
{
    public void Attach(Progress<double> progress)
    {
        lock (_lock)
        {
            _progressors.Add(progress);
            _progress.Add(0);

            progress.ProgressChanged += progress_ProgressChanged;
        }
    }

    void progress_ProgressChanged(object? sender, double e)
    {
        double average = 0;

        lock (_lock)
        {
            for (int i = 0; i < _progressors.Count; i++)
            {
                if (ReferenceEquals(_progressors[i], sender))
                {
                    Console.WriteLine($"Setting progress for progressor {i} to {e}");
                    _progress[i] = e;
                    break;
                }
            }

            average = _progress.Average();
        }

        OnReport(average);
    }

    readonly List<IProgress<double>> _progressors = new();
    readonly List<double> _progress = new();
    readonly object _lock = new object();
}

Sample output:

Thread A is reporting progress = 10
Setting progress for progressor 0 to 10
Amalgamated progress = 3.3333333333333335
Thread A is reporting progress = 20
Thread B is reporting progress = 20
Setting progress for progressor 0 to 20
Setting progress for progressor 1 to 20
Amalgamated progress = 6.666666666666667
Amalgamated progress = 13.333333333333334
Thread A is reporting progress = 30
Setting progress for progressor 0 to 30
Amalgamated progress = 16.666666666666668
Thread A is reporting progress = 40
Thread B is reporting progress = 40
Setting progress for progressor 0 to 40
Amalgamated progress = 20
Setting progress for progressor 1 to 40
Amalgamated progress = 26.666666666666668
Thread C is reporting progress = 50
Setting progress for progressor 2 to 50
Amalgamated progress = 43.333333333333336
Thread A is reporting progress = 50
Setting progress for progressor 0 to 50
Amalgamated progress = 46.666666666666664
Thread B is reporting progress = 60
Setting progress for progressor 1 to 60
Amalgamated progress = 53.333333333333336
Thread A is reporting progress = 60
Setting progress for progressor 0 to 60
Amalgamated progress = 56.666666666666664
Thread A is reporting progress = 70
Setting progress for progressor 0 to 70
Amalgamated progress = 60
Thread B is reporting progress = 80
Setting progress for progressor 1 to 80
Amalgamated progress = 66.66666666666667
Thread A is reporting progress = 80
Setting progress for progressor 0 to 80
Amalgamated progress = 70
Thread A is reporting progress = 90
Setting progress for progressor 0 to 90
Amalgamated progress = 73.33333333333333
Thread C is reporting progress = 100
Setting progress for progressor 2 to 100
Amalgamated progress = 90
Thread B is reporting progress = 100
Setting progress for progressor 1 to 100
Amalgamated progress = 96.66666666666667
Thread A is reporting progress = 100
Setting progress for progressor 0 to 100
Amalgamated progress = 100
All tasks completed.
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276
  • Thank you this works perfectly and is really simple. I had overlooked that `Progress` has an event `ProgressChanged` and was very unsure how to implement this cleanly. Thanks for the help – formeo Mar 30 '23 at 17:07