10

What if you need to run multiple asynchronous I/O tasks in parallel but need to make sure that no more than X I/O processes are running at the same time; and pre and post I/O processing tasks shouldn't have such limitation.

Here is a scenario - let's say there are 1000 tasks; each of them accepts a text string as an input parameter; transforms that text (pre I/O processing) then writes that transformed text into a file. The goal is to make pre-processing logic utilize 100% of CPU/Cores and I/O portion of the tasks run with max 10 degree of parallelism (max 10 simultaneously opened for writing files at a time).

Can you provide a sample code how to do it with C# / .NET 4.5?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx

Grief Coder
  • 6,508
  • 9
  • 38
  • 51
  • Rx 2.0 might be a good fit for this (throttling the second stage to 10 at a time) but I'm not familiar enough with it to say for sure. :-/ – James Manning May 29 '12 at 15:59
  • Does this answer your question? [Nesting await in Parallel.ForEach](https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach) – Michael Freidgeim Nov 05 '21 at 12:30

3 Answers3

9

I think using TPL Dataflow for this would be a good idea: you create pre- and post-process blocks with unbounded parallelism, a file-writing block with limited parallelism and link them together. Something like:

var unboundedParallelismOptions =
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    };

var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions);

var writeToFileBlock = new TransformBlock<string, string>(
    async s =>
            {
                await WriteToFile(s);
                return s;
            },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions);

var propagateCompletionOptions =
    new DataflowLinkOptions { PropagateCompletion = true };

preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);

// use something like await preProcessBlock.SendAsync("text") here

preProcessBlock.Complete();
await postProcessBlock.Completion;

Where WriteToFile() could look like this:

private static async Task WriteToFile(string s)
{
    using (var writer = new StreamWriter(GetFileName()))
        await writer.WriteAsync(s);
}
svick
  • 236,525
  • 50
  • 385
  • 514
  • What are the `PreProcess` and `PostProcess` methods here? – shashwat Sep 23 '17 at 13:33
  • 1
    @shashwat They do whatever is needed. The original question talks about "pre and post I/O processing tasks", so I represented that using methods. – svick Sep 23 '17 at 15:12
1

It sounds like you'd want to consider a Djikstra Semaphore to control access to the starting of tasks.

However, this sounds like a typical queue/fixed number of consumers kind of problem, which may be a more appropriate way to structure it.

Jeff Watkins
  • 6,343
  • 16
  • 19
0

I would create an extension method in which one can set maximum degree of parallelism. SemaphoreSlim will be the savior here.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);
Jay Shah
  • 3,553
  • 1
  • 27
  • 26
  • Hi, I used it in a thread. I've tried to stop the thread by Abort function but ForEachAsyncConcurrent task still running. Do you have a solution for this issue? – Tien Nguyen Aug 03 '21 at 06:59
  • 1
    @TienNguyen I would say add the cancelationToken as the param of ForEachAsyncConcurrent method and cancel that when you stop the thread. – Jay Shah Aug 09 '21 at 10:44
  • Can you update your sample code with cancelationToken? Thank you so much! – Tien Nguyen Aug 11 '21 at 21:42