0

This is what I am trying to achieve. Let's say I have a process which runs every minute and performs some I/O operations. I want 5 threads to execute simultaneously and do the operations. Suppose if 2 threads took longer than a minute and when the process runs again after a minute, it should execute 3 threads simultaneously as 2 threads are already doing some operations.

So, I used the combination of SemaphoreSlim and Parallel.ForEach. Please let me know if this is the correct way of achieving this or there is any other better way.

private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);

private async Task ExecuteAsync()
{
    try
    {
        var availableThreads = _semaphoreSlim.CurrentCount;

        if (availableThreads > 0)
        {
            var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table

            Parallel.ForEach(
                lists,
                new ParallelOptions
                {
                    MaxDegreeOfParallelism = availableThreads
                },
                async item =>
                {
                    await _semaphoreSlim.WaitAsync();

                    try
                    {
                        // I/O operations
                    }
                    finally
                    {
                        _semaphoreSlim.Release();
                    }
                });
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex.Message, ex);
    }
}
Reyan Chougle
  • 4,917
  • 2
  • 30
  • 57
  • All code using `async` with `Parallel` is incorrect. – Stephen Cleary Nov 30 '20 at 15:53
  • @StephenCleary would you rather use `await Task.WhenAll(tasks);`? – OlegI Nov 30 '20 at 15:54
  • 1
    @StephenCleary Could you please explain why it is wrong. This would be helpful for understanding – Reyan Chougle Nov 30 '20 at 16:18
  • @ReyanChougle I would look [here](https://stackoverflow.com/a/15136833/7085070) – OlegI Nov 30 '20 at 16:19
  • @ReyanChougle you still can use Semaphor for throttling – OlegI Nov 30 '20 at 16:20
  • @OlegI But this will wait for all the tasks to process right? I want to implement in such a way that each thread can exit as soon as it's execution is completed. Could you please show a example if possible – Reyan Chougle Nov 30 '20 at 16:37
  • @ReyanChougle: `Parallel` just doesn't work with `async` methods. Any `async` lambda passed to any `Parallel` method is converted to an `async void`, which [causes many other problems](https://learn.microsoft.com/en-us/archive/msdn-magazine/2013/march/async-await-best-practices-in-asynchronous-programming). – Stephen Cleary Nov 30 '20 at 16:41

1 Answers1

2

Let's say I have a process which runs every minute and performs some I/O operations... Suppose if 2 threads took longer than a minute and when the process runs again after a minute, it should execute 3 threads simultaneously as 2 threads are already doing some operations.

This kind of problem description is somewhat common, but is surprisingly difficult to code correctly. This is because you have a polling-style timer (time based) that is trying to periodically adjust a throttling mechanism. Doing this correctly is quite difficult.

So, the first thing I'd recommend is to change the problem description. Consider having the polling mechanism read all outstanding work, and then use normal throttling from there (e.g., adding then to an execution-constrained ActionBlock).

That said, if you'd prefer continuing down the more complex path, code like this would avoid the Parallel with async problem:

private static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(5);

private async Task ExecuteAsync()
{
    try
    {
        var availableThreads = _semaphoreSlim.CurrentCount;

        if (availableThreads > 0)
        {
            var lists = await _feedSourceService.GetListAsync(availableThreads); // select @top(availableThreads) * from table

            var tasks = lists.Select(
                async item =>
                {
                    await _semaphoreSlim.WaitAsync();

                    try
                    {
                        // I/O operations
                    }
                    finally
                    {
                        _semaphoreSlim.Release();
                    }
                }).ToList();
            await Task.WhenAll(tasks);
        }
    }
    catch (Exception ex)
    {
        _logger.LogError(ex.Message, ex);
    }
}
Reyan Chougle
  • 4,917
  • 2
  • 30
  • 57
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810