1

I'm working on parallel workloads where each object or Task reports it's own individual progress, and I want to report collective progress of the task as a whole.

For example, imagine I have 10 Work objects which all report individual progress. They contain 0-100 "tasks" that must be completed.

If we were to iterate linearly over each of the Work objects, we could easily report our progress and see output something like this:

Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #1 of 10 is currently 2 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.

However, when running in parallel, the output would look something like this:

Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #4 of 10 is currently 16 of 100 tasks completed.
Work item #7 of 10 is currently 4 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.

The problem I'm trying to solve is concatenating all progress in parallel loops such so that the output to the user is more akin to "1/1000" or "10/1000", representing the total amount of work accomplished, and updating numerator as work continues.

I would expect there's a solution or pattern that's fitting regardless of Async/Await or using the Task Asynchronous Pattern—I'm using both—and I'm hoping there is already ways to handle this in the .NET framework that I haven't discovered.

Using this simple (pseudocode) example from TAP:

Parallel.ForEach(WorkObject, wo =>
{
    // Perhaps each WorkObject has a "ProgressChanged" delegate that fires progress notifications.
    wo.ProgressChanged += delegate (int currentProgress, int totalProgress)
    {
        ReportProgress($"Work item #{wo.ID} of {WorkObject.Count} is currently {currentProgress} of {totalProgress} tasks completed.
    };

    // Or perhaps using IProgress<T> or Progress?
    // wo.PerformWork(/*IProgress<T> or Progress<T>, etc.*/);
});

We can iterate in parallel, and progress updates/notifications will come in as each thread completes a unit of work.

How can we effectively merge the progress of all of WorkObjects such so that we can report a more uniform "1/1000" completed?

The problem is that each WorkObject could have a varying number of "jobs" to complete, and we could have a varying number of WorkObjects that need to work. If one simply concatenates the numerator and denominator from all WorkObjects as each progress notification comes in (assuming they update after each unit of work is completed), by the end of the parallel workload, the progress notification would reflect something like "1000/100,000" instead of "1000/1000".

It seems that we need a way to keep track of current progress, X, as well as total progress, Y, to form a coherent message for the user about total progress state (X of Y completed.)

Is there an existing model (in the Framework or otherwise) to do this?

My current thought is to create a data structure recording the Thread ID of each thread executing in parallel, and then tracking each thread's progress in that data structure (as an X/Y) value, and finally as each thread posts a progress update, iterating over the data structure to sum X/Y from each thread to generate a total "X/Y" to display to the user.

But surely this problem is being faced by developers every day—so there must be another way?

Justin Shidell
  • 588
  • 1
  • 5
  • 16
  • 1
    Can you separate the ReportProgress responsibility from your Parallel.ForEach loop. Instead have these Interlocked.Increment the total progress counter for each work item. Have a task that every second or whatever checks these values and emits the ReportProgress call. Use a cancellation token on that task to stop it which you can set after the Parallel.ForEach completes. – Adam G Oct 02 '18 at 23:37

2 Answers2

1

I ended up creating a class to manage threaded progress; here's what I came up with:

// A Parallel Progress Manager is designed to collect progress information from multiple sources and provide a total sum of progress.
// For example, if 3 objects are going to perform some work in parallel, and the first object has 10 tasks, the second has 100, and the last has 1000,
// when executing in parallel, it isn't useful to have each task fire a ProgressChanged() event (or something similar), as it would result in the progress
// being returned something like 0/10, 1/10, 2/10, 0/100, 3/10, 1/100, 0/1000, etc. (As each thread executes independently.)
//
// Instead, this class aggregates progress and provides a total sum of progress: 0/1110, 1/1110, etc.
//
// NOTE: The intention of this class is to manage parallelized workloads across numerous jobs. For example, operating in parallel against 3 different objects
// that all report progress independently, such as Paralle.ForEach(IEnumerable<T>). This is not suggested for parallelized workloads of a single job, such as
// Parallel.For(i, 100)—in this case, it is recommended to update progress using Interlocked.Increment() or a lock() on a synchronization object as one would normally.

// Example:
//
// ParallelProgressManager ppm = new ParallelProgressManager();
//
// Parallel.ForEach(IEnumerable<T>, t =>
// {
//      t.ProgressChanged += delegate (long current, long total, bool indeterminate, string message)
//      {
//          lock(ppm)
//          {
//              var x = ppm.SetGetProgress(t.GetHashCode(), current, total);
//
//              ReportProgress(x.Item1, x.Item2, false, $"Working... {x.Item1} / {x.Item2}");
//          }
//      }
// });

using System;
using System.Collections.Generic;

namespace Threading
{
    /// <summary>
    /// A Parallel Progress Manager used to aggregate and sum progress across multiple objects working in parallel.
    /// </summary>
    public class ParallelProgressManager
    {
        /// <summary>
        /// The progress class contains current and total progress and
        /// </summary>
        protected class Progress
        {
            public long Current { get; set; } = 0;
            public long Total { get; set; } = 0;
        }

        /// <summary>
        /// The ProgressDictionary associates each working object's Hash Code with it's current progress (via a Progress object.)
        /// This way an object can operate in parallel and as progress updates come in, the last update is replaced by the new one.
        /// We can then sum the "current" and "total" to produce an overall progress value.
        /// </summary>
        private Dictionary<int, Progress> ProgressDictionary { get; set; } = new Dictionary<int, Progress>();

        /// <summary>
        /// Sets an object's progress via it's Hash Code. If the object isn't recognized, a new entry will be made for it. If it is recognized,
        /// it's progress will be updated accordingly.
        /// </summary>
        /// <param name="hashCode">
        /// The Hash Code of the object (.GetHashCode()) that's reporting progress. The Hash Code is used to distinguish the objects to manage progress of.
        /// </param>
        /// <param name="current">
        /// The current progress.
        /// </param>
        /// <param name="total">
        /// The total progress.
        /// </param>
        public void SetProgress(int hashCode, long current, long total)
        {
            if (!ProgressDictionary.ContainsKey(hashCode))
                ProgressDictionary.Add(hashCode, new Progress() { Current = current, Total = total });
            else
            {
                ProgressDictionary[hashCode].Current = current;
                ProgressDictionary[hashCode].Total = total;
            }
        }

        /// <summary>
        /// Retrieves the total progress of all objects currently being managed.
        /// </summary>
        /// <returns>
        /// A Tuple where the first value represents the summed current progress, and the second value represents the summed total progress.
        /// </returns>
        public Tuple<long, long> GetProgress()
        {
            long c = 0;
            long t = 0;

            foreach (var p in ProgressDictionary)
            {
                c += p.Value.Current;
                t += p.Value.Total;
            }

            return Tuple.Create(c, t);
        }

        /// <summary>
        /// Sets progress for the provided object and retrieves an updated total progress. This is equivalent to calling SetProgress() and then calling
        /// GetProgress() immediately after.
        /// </summary>
        /// <param name="hashCode"></param>
        /// <param name="currentStep"></param>
        /// <param name="totalSteps"></param>
        /// <returns></returns>
        public Tuple<long, long> SetGetProgress(int hashCode, long currentStep, long totalSteps)
        {
            SetProgress(hashCode, currentStep, totalSteps);
            return GetProgress();
        }
    }
}
Justin Shidell
  • 588
  • 1
  • 5
  • 16
  • You need to consider thread-safe collections instead of the Dictionary you have here : https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/ for example : Type Description BlockingCollection Provides bounding and blocking functionality for any type that implements IProducerConsumerCollection. For more information, see BlockingCollection Overview. ConcurrentDictionary – granadaCoder Jul 01 '19 at 19:45
  • @granadaCoder Thanks for the suggestion; I didn't implement a thread-safe collection because my intention was to use a lock instead (and used the ParallelProgressManager as the locking object itself), as shown in the example in the comments of the code I posted. I do think a ConcurrentDictionary would be an excellent improvement for others who are looking for something similar, but are not looking to use a lock. – Justin Shidell Jul 03 '19 at 18:52
  • https://stackoverflow.com/questions/1949131/net-dictionary-locking-vs-concurrentdictionary/4533469#4533469 my advice is that microsoft has probably written a better implementation than what I could write (or you could write) using the "old way". Its a debatable point, just sharing my thoughts. Future developers will "see" a concurrentdictionary sooner than an embedded "lock" statement. – granadaCoder Jul 05 '19 at 12:12
0

The following is one possible approach. Similar to what I described above, except I outsourced the "work" to a Task and pumped ReportProgress from the initial thread context.

Firstly, a couple of classes. I am using a Random to decide how long each Job will take and how many Jobs are in each WorkObject. The Job emulates high CPU load with a tight loop. You would use your own objects (and actual useful work).

public class Job
{
    private readonly TimeSpan timeForJobToTake;

    public Job(TimeSpan timeForJobToTake)
    {
        this.timeForJobToTake = timeForJobToTake;
    }

    public void DoJob()
    {
        DateTime endTime = DateTime.UtcNow.Add(this.timeForJobToTake);
        while (DateTime.UtcNow < endTime)
        {
            // emulate high CPU load during job
        }
    }
}

public class WorkObject
{
    private readonly List<Job> jobs = new List<Job>();

    public WorkObject(Random random)
    {
        int jobsToCreate = random.Next(1, 10);
        for (int i = 0; i < jobsToCreate; i++)
        {
            Job job = new Job(TimeSpan.FromMilliseconds(random.Next(100, 200)));
            this.jobs.Add(job);
        }
    }

    public int JobCount => this.jobs.Count;

    public void PerformWork()
    {
        foreach (Job job in this.jobs)
        {
            job.DoJob();
        }
    }
}

Then you can do something like the following (console application, but the code can work in other contexts):

internal class Program
{
    private static readonly object syncObj = new object();

    private static int lastNumerator;

    private static int numerator;

    private static int denominator;

    private static void ReportProgress()
    {
        int currentNumerator = numerator;
        // Don't emit progress if nothing changed
        if (currentNumerator == lastNumerator) return;
        Console.WriteLine($"{currentNumerator} of {denominator}");
        lastNumerator = currentNumerator;
    }

    private static void Main(string[] args)
    {
        MainAsync().Wait();
        Console.ReadLine();
    }

    private static async Task MainAsync()
    {
        // Setup example objects
        Random random = new Random();
        List<WorkObject> workObjects = new List<WorkObject>();

        int numberOfWorkObjects = random.Next(50, 100);
        for (int i = 0; i < numberOfWorkObjects; i++)
        {
            WorkObject workObject = new WorkObject(random);
            denominator += workObject.JobCount;
            workObjects.Add(workObject);
        }

        // The CancellationTokenSource is used to immediately abort the progress reporting once the work is complete
        CancellationTokenSource progressReportCancellationTokenSource = new CancellationTokenSource();

        Task workTask = Task.Run(() =>
                                 {
                                     Parallel.ForEach(workObjects,
                                                      wo =>
                                                      {
                                                          wo.PerformWork();
                                                          lock (syncObj)
                                                          {
                                                              numerator += wo.JobCount;
                                                          }
                                                      });
                                     progressReportCancellationTokenSource.Cancel();
                                 });

        while (!workTask.IsCompleted)
        {
            try
            {
                ReportProgress();
                await Task.Delay(250, progressReportCancellationTokenSource.Token);
            }
            catch (TaskCanceledException)
            {
                break;
            }
        }

        await workTask;
        ReportProgress();
    }
}
Adam G
  • 1,283
  • 1
  • 6
  • 15