-1

I have a question about parallel execution of the async methods.

I want to limit number of async methods that will be executed simultaneously (to limit number of web requests to another system, which are actually sent in the async method).

So what is the best way to do that ? I found a solution by using Parallel, and setting DegreeOfParallelism, but I don't really like this solution, because it will block number of threads equal to DOP (degree of parallelism).

So, here is the code with Parallel:

var results = dataProvider.GetResults(someIdParam);
            var average = results.AsParallel().WithDegreeOfParallelism(5).Average(x =>
            {
                var t = GetSSIM(x);
                t.Wait();
                return t.Result;
            });

So, this will work and limit number of simultaneous requests, but will block 5 threads.

I ended up writing my own method:

    public static async Task<IEnumerable<T2>> ProcessEachAsync<T1, T2>(IEnumerable<T1> src, Func<T1, Task<T2>> func, int dop)
    {
        var workers = new Task<T2>[Math.Min(dop, src.Count())]; //to make sure that we will not have nulls in workers collection
        var result = new List<T2>();

        int counter = 0;
        int index = 0;
        foreach(var element in src)
        {
            if (counter < dop)
                index = counter++;
            else
            {
                var ended = await Task.WhenAny(workers);
                index = Array.FindIndex(workers, x => x == ended);
                result.Add(ended.Result);
            }

            var t = func(element);
            t.Start();
            workers[index] = t;
        }

        Task.WaitAll(workers);
        result.AddRange(workers.Select(x => x.Result));
        return result;
    }

CAUTION!!!!! This code is not yet tested and have bugs!!!! But it explains main idea

So, this solution will block only 1 thread. Maybe there is simpler way to achieve what I want?

Eramir
  • 482
  • 1
  • 5
  • 18
  • I think the code review version of stack overflow would this question more, just fyi and I think you'll get better and more feedback https://codereview.stackexchange.com – Dave Feb 15 '18 at 13:26
  • 2
    You can use `SemaphoreSlim` to limit that number, as described for example here: https://stackoverflow.com/a/23316722/5311735 (and in many other places). – Evk Feb 15 '18 at 13:32
  • check rate limiting here: http://www.jackleitch.net/2010/10/better-rate-limiting-with-dot-net/ – dereli Feb 15 '18 at 13:33
  • @Evk - Yes, SemaphoreSlim fits perfectly – Eramir Feb 15 '18 at 14:17

1 Answers1

1

Thanks to @evk and my colleague who helped me with this question. So I implemented a solution with SemaphoreSlim. It's disadvantage is that it converts all data into Tasks, but the code is so beautiful that I will leave it :)

    public static async Task<IEnumerable<T2>> ProcessEachAsync<T1, T2>(IEnumerable<T1> src, Func<T1, Task<T2>> func, int dop)
    {
        using (var semSlim = new SemaphoreSlim(dop))
        {
            var result = new ConcurrentBag<T2>();
            Func<T1, Task> getTask = async (x) =>
            {
                try
                {
                    await semSlim.WaitAsync();
                    var res = await func(x);
                    result.Add(res);
                }
                finally
                {
                    semSlim.Release();
                }
            };

            await Task.WhenAll(src.Select(x => getTask(x)));
            return result;
        }
    }

CAUTION!!! NOT TESTED!!!!

Thanks everybody!

Eramir
  • 482
  • 1
  • 5
  • 18
  • You forgot to dispose semaphore. Also 'new Task' is not needed - lambdas can be async too. Finally, concurrent bag is not needed too - you can return result from lambda ('return await func(x)') – Evk Feb 15 '18 at 15:36
  • @Evk I made changes according to your comment. Now this answer represent the solution I ended up with. Thanks! – Eramir Feb 16 '18 at 13:48