I am attempting to create a concurrent version of SelectAwait
(and others) present as part of System.Linq.Async
which provides extension methods to IAsyncEnumerable
. This is the code that I am using:
private async IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
var sem = new SemaphoreSlim(1, 10);
var retVal = enumerable.Select(item => {
var task = Task.Run(async () => {
await sem.WaitAsync();
var retVal = await predicate(item);
sem.Release();
return retVal;
});
return task;
});
await foreach (var item in retVal)
yield return await item;
}
Enumerable is a simple enumerable from 0-1000. The code is being called as
.SelectParallelAsync(async i =>
{
Console.WriteLine($"In Select : {i}");
await Task.Delay(1000);
return i + 5;
});
I was expecting all the tasks to get started immediately and being run 10 at a time. However, they get triggered one after another. Is there any way I can achieve something like this? Much appreciated.
EDIT: I am using semaphore instead of Parallel.ForEach
or .AsParallel().WithMaxDegreeOfParallelism
because I want to share this semaphore between multiple methods. Furthermore, PLINQ is not exactly very extendable and I can't add my own extension methods to it.
EDIT 2 : Added my own solution for sake of completion.