0

I am attempting to create a concurrent version of SelectAwait (and others) present as part of System.Linq.Async which provides extension methods to IAsyncEnumerable. This is the code that I am using:

private async IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
    this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
    var sem = new SemaphoreSlim(1, 10);
    
    var retVal = enumerable.Select(item => {
        var task = Task.Run(async () => {
            await sem.WaitAsync();
            var retVal = await predicate(item);
            sem.Release();

            return retVal;
        });

        return task;
    });

    await foreach (var item in retVal)
        yield return await item;
}

Enumerable is a simple enumerable from 0-1000. The code is being called as

.SelectParallelAsync(async i =>
{
    Console.WriteLine($"In Select : {i}");
    await Task.Delay(1000);
    return i + 5;
});

I was expecting all the tasks to get started immediately and being run 10 at a time. However, they get triggered one after another. Is there any way I can achieve something like this? Much appreciated.

EDIT: I am using semaphore instead of Parallel.ForEach or .AsParallel().WithMaxDegreeOfParallelism because I want to share this semaphore between multiple methods. Furthermore, PLINQ is not exactly very extendable and I can't add my own extension methods to it.

EDIT 2 : Added my own solution for sake of completion.

Grimson
  • 540
  • 1
  • 5
  • 21
  • Your `await foreach` is asynchronously enumerating each item one at a time. You need to throttle the enumeration as well as the `predicate`. A bounded Channel should work. – Stephen Cleary Jan 23 '23 at 17:48
  • @StephenCleary I also tried to force enumeration by calling `ToListAsync` and that sets the return type to be `List>`. I was assuming doing this would start the enumeration and since I am not awaiting tasks in this new step, tasks (throttled by `SemaphoreSlim`) will be launched concurrently. But that still forces enumeration to be one at a time. Can you explain why as well? – Grimson Jan 23 '23 at 17:57
  • @StephenCleary I am also trying to avoid Channels since I am not very familiar with them. Can you provide a code sample without Channels (if possible). I am also not sure how I can share maximum tasks/semaphore between them so that I can share this throttling behavior among all the related methods. – Grimson Jan 23 '23 at 18:00

2 Answers2

1

The enumeration of the source IAsyncEnumerable<T> enumerable is driven by the enumeration of the resulting AsyncEnumerable<TOut>. When the consumer of the resulting sequence requests the first TOut element of the sequence, at that point a T value will be requested from the source IAsyncEnumerable<T> enumerable. Then the value will be projected to a Task<TOut>, then this task will be awaited, and finally the result of the task will returned to the consumer. Everything happens sequentially. There is no concurrency. There is no internal activity before the consumer asks for an element, and after the element has been delivered to the consumer.

Adding concurrency to a LINQ operator is much more involved than it might appear from the first glance. It means that when the consumer asks for the first element, 10 tasks must start at once. And when any of these tasks completes, another task must start automatically in its place, without the consumer asking for it. And there must be a limit to how many tasks can be stored internally, that have not yet been requested by the consumer. And no more tasks should be started when this limit is reached, until the consumer takes one and creates an empty slot. And you must think what to do with the internal mechanism that actively starts the tasks and watches for their completion, in case the consumer decides that it had enough, and won't request any more elements (by exiting the consuming loop). And you must also think what to do with the stored tasks, in case the one that is about to be delivered to the consumer has failed. And what if more than one tasks have failed? And what to do in case the enumeration is canceled with a CancellationToken?

Doing all these correctly using only primitive tools like TaskCompletionSources and SemaphoreSlims, without using higher-level tools like the Channel<T>, is extremely difficult. If you are not familiar with the Channel<T>, my advice is to spend some time and familiarize yourself with it. It's a quite simple mechanism. If you know anything about the BlockingCollection<T> class, the Channel<T> is an asynchronous version of it.

In another question I have posted an AwaitResults method, that could be used to implement the SelectParallelAsync operator quite easily:

private IAsyncEnumerable<TOut> SelectParallelAsync<T, TOut>(
    this IAsyncEnumerable<T> enumerable, Func<T, Task<TOut>> predicate)
{
    return enumerable
        .Select(item => predicate(item))
        .AwaitResults(maxConcurrency: 10);
}

You could study that implementation, and change it to fit your needs.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
0

For the sake of completion, I'd also like to add why the code in the question wasn't working and what I did to fix it.

The original created differed execution and when we did an await foreach, it would wait for each of the item in the enumerable to be completed before it can operate upon them (in this case, just return). As a result, the tasks were just getting fired one by one instead of being fired simultaneously. Another issue was my lack of understanding of how the semaphore works. I should have been using SemaphoreSlim(10) or something like that for multiple "threads" to access that code block at once.

To fix this, we need to start all the tasks at once and replace the semaphore piece. Adding a ToListAsync pretty much solves the first issue. It simply starts all the tasks at once. The final code which I went with looks something like this

public static async IAsyncEnumerable<TOut> SelectParallelAsync<TIn, TOut>(
this IAsyncEnumerable<TIn> enumerable, ParallelAsyncOptions options, Func<TIn, ValueTask<TOut>> predicate)
{
    var sem = new SemaphoreSlim(options.MaxDegreeOfParallelism, options.MaxDegreeOfParallelism);
    var retVal = await enumerable.Select(item => {
        return Task.Run(async () => {
            await sem.WaitAsync();
            var retVal = await predicate(item);
            sem.Release();

            return retVal;
        });
    }).ToListAsync();

    foreach (var item in retVal)
        yield return await item;    
}

I'd also like to add that the the other solution using channel works pretty well as well. A similar solution can however be achieved using a ConcurrentQueue using a "Complete" variable to correctly handle when the loop exits. The solution I proposed doesn't require the completion flag because that's inherently a part of IEnumerable or IAsyncEnumerable. Simply exiting the enumeration block completes the enumeration as well.

EDIT : Had a conversation (in comments) with the author of the other answer and he pointed out faults in this implementation. His implementation supports true asynchronous enumerable way better than mine does.

Grimson
  • 540
  • 1
  • 5
  • 21
  • There are lots of rough corners in your implementation, with most prominent being that no `TOut` value is going to be emitted before completing the enumeration of the asynchronous `enumerable` sequence. Asynchronous sequences typically don't emit their elements instantaneously. The [`MoveNextAsync`](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerator-1.movenextasync) might take some time to complete, and this latency will add up because all the `TIn` elements must be emitted before the `ToListAsync` completes. And this is just the tip of the iceberg. – Theodor Zoulias May 21 '23 at 22:05
  • I am actually using this setup (with CancellationToken support and delayed task creation) right now with its WhereParallelAsync cousin and the output in the UI has healthy distribution of Selects, Wheres and the final enumeration. The enumeration also starts as soon as I start debugging and I can see the output on console pretty much straight away. Final output takes some time but its way before the enumeration completes. So I'm not sure if I understood you correctly. This is the repo for reference https://github.com/AnmolSinha1201/ParallelAsyncEnumerable – Grimson May 22 '23 at 06:49
  • From [what I see](https://github.com/AnmolSinha1201/ParallelAsyncEnumerable/blob/main/Source/Program.cs) you are testing solely with an `IAsyncEnumerable` that is actually synchronous: `Enumerable.Range(0, 100).ToAsyncEnumerable()`, which hides the shortcomings of your implementation. I could post demos that show each and every rough point, including a race condition that could cause a deadlock (because of lack of `try`-`finally` around the `await predicate`), but I don't think that it's needed. The lack of testing should make it obvious to anyone that this is not a mature implementation. – Theodor Zoulias May 22 '23 at 09:58
  • Hey, if you can post some examples, I'd really appreciate it. So far I just tested it with `IAsyncEnumerable` because it was convenient. I also tested it with another `IAsyncEnumerable` which is not-so-synchronous or bound but returns some number every 200ms but I'd say it behaved as expected. But I think you're right, I need to add some `try`-`finally` blocks around to make it more robust. Can you provide more examples please? I am actually serious about making it a robust thing. – Grimson May 22 '23 at 21:15
  • It's far from trivial to make it robust and well behaved, especially if you don't want to use a `Channel`. When you test your implementation with a truly asynchronous `IAsyncEnumerable` source sequence, didn't you notice that the resulting `IAsyncEnumerable` does not emit the first `TOut` before the source has completed emitting all of its elements? For example if the source `IAsyncEnumerable` contains 100 elements, and emits them with a 200 msec interval, then for the first 20,000 msec (100x200) the resulting `IAsyncEnumerable` will emit nothing. – Theodor Zoulias May 23 '23 at 01:13
  • I use `SelectAwait` to create this method which basically returns a few integers at a time. Something like `if (i % 20 == 0) await Task.Delay(1000)` works well. Since I don't really have a truly asynchronous source, this is probably an ideal way to emulate it. Anyway, I can see the numbers being emitted before the enumeration is complete. On console, I can see about 16 numbers emitted, then a few in select block, then a few getting emitted again, then some in where block then some in select block and eventually the actual enumeration outside API start appearing while API produces output – Grimson May 23 '23 at 04:33
  • I can be wrong here, but from my understanding, the whole point of using `Channel` in your implementation is to start the tasks using semaphore and then put them into `Channel` so you can consume them later in a controlled way. I achieve the same thing by starting the tasks with `ToListAsync` and then relying on `MoveNextAsync` (implicit) to exit the loop. To me, both seem equivalent and I can't see what extra scenario your implementation is covering and mine isn't. Yours is of course, more robust due to `try`-`catch`-`finally` blocks.. – Grimson May 23 '23 at 04:42
  • OK, [here](https://dotnetfiddle.net/vTbz25) is an online demo. You can see that the first `Result: #X` log entry comes after the log entry `Produce finished`. This is unlikely to be the behavior that you want. The desirable behavior is to start emitting the results as soon as they are available, not when the `source` sequence completes. – Theodor Zoulias May 23 '23 at 18:58
  • Okay, I see your point. Your implementation works as expected in such scenario. However, can you explain me this - when I chain multiple of these enumerators together like `SelectParallelAsync` and `WhereParallelAsync`, I see an ouput like this https://pastebin.com/qXXJyYRu . Can you help me understand why the final enumerator has started producing numbers before the `where`s have completed their enumeration? Can you explain the behavior to me? – Grimson May 24 '23 at 03:18
  • I am not really sure because you have linked to the output of your code, but not to the code itself, so I can only imagine what your code looks like. I suggest to include time information in the logging, so that you can see at what time each thing happened. It is possible that too many things are happening practically at the same time, and the order of the logs is dominated by non-deterministic factors like the switching of the threads by the OS. – Theodor Zoulias May 24 '23 at 03:53
  • The output belongs to this code https://pastebin.com/KcKAcCFQ . I am not sure why I am seeing the generation and first select together and then later where and final enumeration together. I also tried adding other blocks in between and apparently the only thing that doesn't go together is the initial generation and the first enumeration block. I'm just extra confused at this point. – Grimson May 24 '23 at 04:58
  • Could you replace the `Console.WriteLine` with the `Print` method from [my demo](https://dotnetfiddle.net/vTbz25), and then update the [pastebin](https://pastebin.com/qXXJyYRu) with the new output? Without timing information it's hard to guess what's going on. – Theodor Zoulias May 24 '23 at 06:31
  • This is the output with `Print` https://pastebin.com/7ZkKNFiE . The code is still the same. – Grimson May 24 '23 at 20:49
  • I am seeing that the interleaving of the logs from the `Where` and the final enumeration happens inside a minuscule timespan of 2 milliseconds (23:47:45.508 - 23:47:45.510). This can be explained by thread-switching by the OS. The OS can suspend any thread at any time for a few milliseconds. This phenomenon would worth further investigation if the interleaving was spread in a wider timespan. – Theodor Zoulias May 24 '23 at 21:47