2

This is a follow up question from this question: How do you run a variable number of concurrent parametrizable infinite loop type of threads in C#?

Say I have a value taskLimit (assume 20), of no of simultaneous Tasks "MyTask" which are created in the RunAsync method below:

protected override async Task RunAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();
    try
    {
        for (int i = 0; i < taskLimit; i++)
        {
            tasks.Add(MyTask(cancellationToken, i);
        }
        
        await Task.WhenAll(tasks);
    }
    catch (Exception e)
    {
        //Exception Handling
    }
}
public async Task MyTask(CancellationToken cancellationToken, int a)
{
    while (true)
    {
        cancellationToken.ThrowIfCancellationRequested();

        try
        {
            //long running code, if possible check for cancellation using the token
            //Do something useful here. Very Processor and IO heavy. Takes 5-10 minutes to complete.
            //SomeHeavyTask can only concurrently run upto a limit of say 5. Implement a token system of sorts
            while(freeTokens<1)
            {
            await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken);
            }
            freeTokens = freeTokens-1;
            SomeHeavyTask(cancellationToken);
            freeTokens = freeTokens+1;
            
            //sleep for an independently parameterizable period, then wake up and repeat
            await Task.Delay(TimeSpan.FromHours(parametrizableTaskDelay[i]), cancellationToken);
        }
        catch (Exception e)
        {
            //Exception Handling
        }
    }
}

Is it possible to do such a thing? Is there a better more formal approach supported natively in C# to achieve the same thing? Please note the essence of this question is that there are substantially less freeTokens than the taskLimit. And that each MyTask spends only 10% time in SomeHeavyTask(), and most of the time is spent in await Task.Delay().

  • @PeterBons Please let me know if you know an answer to this. – desiengineer Dec 11 '20 at 01:37
  • 1
    Is https://learn.microsoft.com/en-us/dotnet/api/system.threading.semaphore?view=net-5.0 what you want? – mjwills Dec 11 '20 at 01:37
  • 4
    Related: [How to limit the amount of concurrent async I/O operations?](https://stackoverflow.com/questions/10806951/how-to-limit-the-amount-of-concurrent-async-i-o-operations/) – Theodor Zoulias Dec 11 '20 at 01:41
  • You can't do `freeTokens = freeTokens-1;` in multithreaded code. It's unsafe. – Enigmativity Dec 11 '20 at 06:10
  • A lot of answers can also be found here: https://stackoverflow.com/questions/20355931/limiting-the-amount-of-concurrent-tasks-in-net-4-5/42158377#42158377 – Peter Bons Dec 12 '20 at 19:20

4 Answers4

3

You should use Microsoft's Reactive Framework (aka Rx) - NuGet System.Reactive and add using System.Reactive.Linq; - then you can do this:

int taskLimit = 500;
int maxConcurrent = 5;

IObservable<Unit> query =
    Observable
        .Range(0, taskLimit)
        .Select(x => Observable.FromAsync(ct => SomeHeavyTask(ct)))
        .Merge(maxConcurrent);
        
await query;

That's a lot easier to work with in my book.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
3

Another option:

var block = new ActionBlock<int>(x => SomeHeavyTask(cancellationToken, x), 
    new ExecutionDataflowBlockOptions() 
    { 
        MaxDegreeOfParallelism = 20,
        CancellationToken = cancellationToken
    });

for (int i = 0; i < 100; i++)
    await block.SendAsync(i, cancellationToken);
    
block.Complete();
await block.Completion;
Magnus
  • 45,362
  • 8
  • 80
  • 118
  • 1
    You should probably configure the `ActionBlock` with the same `cancellationToken`, to make the whole process more responsive in case of cancellation. – Theodor Zoulias Dec 11 '20 at 12:01
1

Like @mjwills said, you can use a C# semaphore to manage concurrent access to resources. (random example)

I do recommend looking at existing solutions first though. For example, Hangfire. You can store its state inside SF if needed.

LoekD
  • 11,402
  • 17
  • 27
1

You could use a SemaphoreSlim to limit the number of tasks that are working concurrently (you will still have taskLimit Tasks active, but only a limited number of those will be doing the heavy work simultaneously; I assume this is what you want).

This is best demonstrated with a sample console app. If you run this you'll see from the output that a maximum of 5 "heavy tasks" are active simultaneously.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static async Task Main()
        {
            Console.WriteLine("Starting");

            // Cancel after 30 seconds for demo purposes.
            using var source = new CancellationTokenSource(TimeSpan.FromSeconds(30));
            await RunAsync(source.Token);

            Console.WriteLine("Stopped.");
            Console.ReadLine();
        }

        public static async Task RunAsync(CancellationToken cancellationToken)
        {
            int taskLimit = 20;
            int concurrencyLimit = 5;

            var sem   = new SemaphoreSlim(concurrencyLimit);
            var tasks = new List<Task>();

            try
            {
                for (int i = 0; i < taskLimit; i++)
                {
                    int p = i; // Prevent modified closure.
                    tasks.Add(Task.Run(() => MyTask(cancellationToken, p, sem)));
                }

                await Task.WhenAll(tasks);
            }

            catch (OperationCanceledException)
            {
                Console.WriteLine("Task(s) were cancelled.");
            }

            catch (Exception e)
            {
                // Exception Handling
            }
        }

        public static async Task MyTask(CancellationToken cancellationToken, int a, SemaphoreSlim sem)
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();

                try
                {
                    await sem.WaitAsync(cancellationToken);

                    try
                    {
                        someHeavyTask(cancellationToken, a);
                    }
                    
                    finally
                    {
                        sem.Release();
                    }
                }

                catch (OperationCanceledException)
                {
                    Console.WriteLine("Task was cancelled.");
                    return;
                }

                catch (Exception e)
                {
                    //Exception Handling
                }
            }
        }

        static int heavyTaskCount;

        static void someHeavyTask(CancellationToken cancel, int a)
        {
            int n = Interlocked.Increment(ref heavyTaskCount);
            Console.WriteLine("Starting heavy task. Number of simultaneous heavy tasks = " + n);

            // Simulate work. Make the work for each task take varying time by using 'a' for the sleep.

            for (int i = 0; i < 20 && !cancel.IsCancellationRequested; ++i)
            {
                Thread.Sleep(100 + a*10);
            }

            n = Interlocked.Decrement(ref heavyTaskCount);
            Console.WriteLine("Finishing heavy task. Number of simultaneous heavy tasks = " + n);
        }
    }
}

The core of this is controlled by the semaphore in the code here:

await sem.WaitAsync(cancellationToken);

try
{
    someHeavyTask(cancellationToken, a);
}

finally
{
    sem.Release();
}
Matthew Watson
  • 104,400
  • 10
  • 158
  • 276