0

I came up with the following code which calls a database paging function repeatedly with a page size of 5 and for each item in a page executes a function in parallel with a max concurrency of 4. It looks like its working so far but I'm unsure if I need to use locking to enclose the parallelInvocationTasks.Remove(completedTask); line and Task.WhenAll(parallelInvocationTasks.ToArray()); So do I need to use locking here and do you see any other improvements?

Here's the code

Program.cs

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        private static async Task Main(string[] args)
        {
            Console.WriteLine("Starting");
            Func<int, int, CancellationToken, Task<IList<string>>> getNextPageFunction = GetNextPageFromDatabase;
            await getNextPageFunction.ForEachParallel(4, 5, new CancellationToken(), async (item) =>
            {
                Console.WriteLine($"{item} started");
                //simulate processing
                await Task.Delay(1000);
                Console.WriteLine($"{item} ended");
            });
            
            Console.WriteLine("Done");
        }

        private static async Task<IList<string>> GetNextPageFromDatabase(
            int offset,
            int pageSize,
            CancellationToken cancellationToken)
        {
            //simulate i/o and database paging
            await Task.Delay(2000, cancellationToken);
            var pageData = new List<string>();
            
            //simulate just 4 pages
            if (offset >= pageSize * 3)
            {
                return pageData;
            }

            for (var i = 1; i <= pageSize; i++)
            {
                string nextItem = $"Item {i + offset}";
                pageData.Add(nextItem);
            }

            return pageData;
        } 
    }
}

PagingExtensions.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public static class PagingExtensions
    {
        public static async Task<int> ForEachParallel<TItem>(
            this Func<int, int, CancellationToken, Task<IList<TItem>>> getNextPageFunction,
            int concurrency,
            int pageSize,
            CancellationToken cancellationToken,
            Func<TItem, Task> forEachFunction)
        {
            var enumeratedCount = 0;
            if (getNextPageFunction == null || forEachFunction == null)
            {
                return enumeratedCount;
            }

            var offset = 0;
            using (var semaphore = new SemaphoreSlim(concurrency))
            {
                IList<Task> parallelInvocationTasks = new List<Task>();
                IList<TItem> items;
                
                do
                {
                    items = await getNextPageFunction(offset, pageSize, cancellationToken) ?? new List<TItem>();
                    foreach (TItem item in items)
                    {
                        await semaphore.WaitAsync(cancellationToken);
                        Task forEachFunctionTask = Task.Factory.StartNew(async () =>
                            {
                                try
                                {
                                    await forEachFunction(item);
                                }
                                finally
                                {
                                    // ReSharper disable once AccessToDisposedClosure
                                    // This is safe as long as Task.WhenAll is called before the using semaphore
                                    // enclosure ends
                                    semaphore.Release();
                                }
                            }, cancellationToken)
                            .Unwrap();

                        parallelInvocationTasks.Add(forEachFunctionTask);
                        
#pragma warning disable 4014
                        forEachFunctionTask.ContinueWith((completedTask) =>
#pragma warning restore 4014
                        {
                            if (completedTask.Exception == null)
                            {
                                //Intention is to release completed tasks during enumeration as they complete
                                //so they can be GCed. This is to ensure the 'parallelInvocationTasks' list does not
                                //grow in an unmanaged manner resulting in a list holding multiple completed tasks
                                //unnecessarily consuming more memory with each added invocation task
                                //Thus the final Task.WhenAll call below will only need to await only faulted tasks
                                //causing it to throw an exception and/or a minimal list of incomplete tasks only
                                parallelInvocationTasks.Remove(completedTask);
                            }
                        }, cancellationToken);

                        enumeratedCount += 1;
                    }
                    
                    offset += pageSize;
                }
                while (items.Count >= pageSize);
                
                await Task.WhenAll(parallelInvocationTasks.ToArray());
            }

            return enumeratedCount;
        }
    }
}
Harindaka
  • 4,658
  • 8
  • 43
  • 62
  • Did you intentionally put `await semaphore.WaitAsync(cancellationToken);` outside of the task and `semaphore.Release();` inside the task created in `ForEachParallel()`? Normally you would place these so they execute on the same thread(task) – DekuDesu May 21 '21 at 08:32
  • Why do you not simply use Parallel.Foreach ? As far as I can see it has all the features you require, and should be guaranteed to be safe. – JonasH May 21 '21 at 08:33
  • 1
    `parallelInvocationTasks.Remove(completedTask);` should absolutely have a `lock`, any time you **might** modify a collection, even one captured in a method like this, you should lock it while modifying it. – DekuDesu May 21 '21 at 08:35
  • `List.ToArray` is an [O(n) operation per MSDN](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.list-1.toarray?view=net-5.0#remarks), it's safe to say using the extension would iterate and read the values sequentially, therefor you use lock the `await Task.WhenAll(parallelInvocationTasks.ToArray());` since there is a remote chance the collection may change during `ToArray`'s execution. – DekuDesu May 21 '21 at 08:38
  • Yes I was betting on the ToArray to work as @DekuDesu explained. I see now that locking that makes sense. Thanks – Harindaka May 21 '21 at 08:59
  • 1
    @JonasH Parallel.ForEach is made for CPU-intensive computations and doesn't recognize async methods. It doesn't await them AFAIK – Harindaka May 21 '21 at 09:25
  • @DekuDesu awaiting the semaphore before creating the task is a valid technique. It is more complex than awaiting it inside the task, but also more efficient. You can take a look at [this](https://stackoverflow.com/questions/66391377/is-there-a-way-to-limit-the-number-of-parallel-tasks-globally-in-an-asp-net-web/66443140#66443140) answer for more details. – Theodor Zoulias May 21 '21 at 09:42
  • Having googled this for a bit I'm leaning more towards TPL DataFlow and ActionBlocks. Something like this: https://stackoverflow.com/a/11565317/454378 – Harindaka May 21 '21 at 09:52
  • 1
    It absolutely is! I was just clarifying. I was there when you wrote it the first time :P. I just find it important with semaphores, if that technique is used intentionally - as it is one of the few ways you can lead mis-matched releases if poorly implemented. – DekuDesu May 21 '21 at 09:52
  • 1
    Harindaka yes, with the TPL Dataflow you'll get better exception handling behavior. Basically a failed task will cause the timely failure of the whole parallel loop, instead of waiting for all other tasks to complete just to receive this error at the end. [I have posted myself](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach/65251949#65251949) some simple implementations of a `ForEachAsync` method that is based on TPL Dataflow, in the linked question. – Theodor Zoulias May 21 '21 at 10:02
  • @TheodorZoulias I formulated the [answer below](https://stackoverflow.com/a/67635290/454378) which I believe gets the job done. Thanks – Harindaka May 21 '21 at 10:41

1 Answers1

1

Ok based on the comments above and a little bit more research I arrived at this answer which gets the work done without all the complexity of having to write custom code to manage concurrency. It uses ActionBlock from TPL DataFlow

PagingExtensions.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp1
{
    public static class PagingExtensions
    {
        public delegate Task<IList<TItem>> GetNextPageDelegate<TItem>(
            int offset,
            int pageSize,
            CancellationToken cancellationToken);

        public static async Task<int> EnumerateParallel<TItem>(
            this GetNextPageDelegate<TItem> getNextPageFunction,
            int maxDegreeOfParallelism,
            int pageSize,
            CancellationToken cancellationToken,
            Func<TItem, Task> forEachFunction)
        {
            var enumeratedCount = 0;
            if (getNextPageFunction == null || forEachFunction == null)
            {
                return enumeratedCount;
            }

            var offset = 0;
            var forEachFunctionBlock = new ActionBlock<TItem>(forEachFunction, new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = pageSize > maxDegreeOfParallelism ? pageSize : maxDegreeOfParallelism,
                EnsureOrdered = false,
                MaxDegreeOfParallelism = maxDegreeOfParallelism,
                CancellationToken = cancellationToken
            });

            IList<TItem> items;

            do
            {
                items = await getNextPageFunction(offset, pageSize, cancellationToken) ?? new List<TItem>();
                foreach (TItem item in items)
                {
                    await forEachFunctionBlock.SendAsync(item, cancellationToken);
                    enumeratedCount += 1;
                }

                offset += pageSize;
            }
            while (items.Count >= pageSize);

            forEachFunctionBlock.Complete();
            await forEachFunctionBlock.Completion;

            return enumeratedCount;
        }
    }
}
Harindaka
  • 4,658
  • 8
  • 43
  • 62
  • 1
    Nice. Just be aware that the `BoundedCapacity` option can also limit to the `MaxDegreeOfParallelism`. In case the `BoundedCapacity` is smaller than the configured `MaxDegreeOfParallelism`, the effective `MaxDegreeOfParallelism` will be reduced to the `BoundedCapacity` value. – Theodor Zoulias May 21 '21 at 10:51
  • 1
    @TheodorZoulias Thanks. Edited the answer to set the larger value – Harindaka May 21 '21 at 13:02