3

I have the following code:

var things = await GetDataFromApi(cancellationToken);

var builder = new StringBuilder(JsonSerializer.Serialize(things));

await things
    .GroupBy(x => x.Category)
    .ToAsyncEnumerable()
    .SelectManyAwaitWithCancellation(async (category, ct) =>
    {
        var thingsWithColors = await _colorsApiClient.GetColorsFor(category.Select(thing => thing.Name).ToList(), ct);
        
        return category
            .Select(thing => ChooseBestColor(thingsWithColors))
            .ToAsyncEnumerable();
    })
    .ForEachAsync(thingAndColor =>
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId); // prints different IDs
        builder.Replace(thingAndColor.Thing, $"{thingAndColor.Color} {thingAndColor.Thing}");
    }, cancellationToken);

It uses System.Linq.Async and I find it difficult to understand. In "classic"/synchronous LINQ, the whole thing would get executed only when I call ToList() or ToArray() on it. In the example above, there is no such call, but the lambdas get executed anyway. How does it work?

The other concern I have is about multi-threading. I heard many times that async != multithreading. Then, how is that possible that the Console.WriteLine(Thread.CurrentThread.ManagedThreadId); prints various IDs? Some of the IDs get printed multiple times, but overall there are about 5 thread IDs in the output. None of my code creates any threads explicitly. It's all async-await. The StringBuilder does not support multi-threading, and I'd like to understand if the implementation above is valid.

Please ignore the algorithm of my code, it does not really matter, it's just an example. What matters is the usage of System.Async.Linq.

mnj
  • 2,539
  • 3
  • 29
  • 58
  • 1
    In an async method, code after the first await *may* run on a different thread. – Klaus Gütter Aug 26 '22 at 06:42
  • Therefore my code is not safe, because `StringBuilder` is not multi-threaded? – mnj Aug 26 '22 at 06:45
  • Or maybe, even though there are different threads, they are never parallel, and in such case `StringBuilder` will be OK? – mnj Aug 26 '22 at 06:46
  • @mnj what are you really trying to do? This code is a "bit" convoluted and certainly unsafe - not only isn't `StringBuilder` thread-safe, the final step executes string replacement in essentially random order. If you want to create a processing pipeline the output should come from the last block/operation in the pipeline, not by modifying global state – Panagiotis Kanavos Aug 26 '22 at 07:09
  • In any case, Async LINQ is only a set of operations over async streams, *not* a library for constructing processing pipelines. There's no way to specify concurrency or buffering just with async streams. Only `Parallel.ForEachAsync` allows you to specify concurrency and that doesn't produce output. Creating pipelines is the job of the `System.Threading.Tasks.Dataflow` namespace or lower-level code combined with Channels for buffering. That lower-level code can include async streams and Async LINQ in several cases – Panagiotis Kanavos Aug 26 '22 at 07:22
  • @PanagiotisKanavos Yes, I know that the code is weird, I'm really just exploring System.Linq.Async. Thanks for your input. – mnj Aug 26 '22 at 07:32
  • @mnj you still didn't explain what you want to do though. It's easy to create a pipeline of Dataflow blocks that get a list of items from one API call, execute one API call per item *concurrently*, combine the results and eg write everything to a file. You can do the same using functions that receive/return Channels or IAsyncEnumerable but internally use `Parallel.ForEachAsync` to make the calls in parallel. LINQ Async can be used to filter or transform but can't be used to actually do stuff concurrently – Panagiotis Kanavos Aug 26 '22 at 07:39
  • @mnj in fact, LINQ Async can cause problems because some operations consume the entire stream before producing output, eg `GroupBy`. That's probably *not* what you want to do if you want to process a large file, calling different APIs per category – Panagiotis Kanavos Aug 26 '22 at 07:40
  • Related: [async/await different thread ID](https://stackoverflow.com/questions/33821679/async-await-different-thread-id) – Theodor Zoulias Aug 26 '22 at 08:57
  • Also related: [Are you there, asynchronously written value?](https://stackoverflow.com/questions/28871878/are-you-there-asynchronously-written-value) Quoting from the accepted answer: *"Yes, TPL includes the appropriate barriers when tasks are queued and at the beginning/end of task execution so that values are appropriately made visible."* – Theodor Zoulias Aug 26 '22 at 09:00

2 Answers2

5

ForEachAsync would have a similar effect as ToList/ToArray since it forces evaluation of the entire list.

By default, anything after an await continues on the same execution context, meaning if the code runs on the UI thread, it will continue running on the UI thread. If it runs on a background thread, it will continue to run on a background thread, but not necessarily the same one.

However, none of your code should run in parallel. That does not necessarily mean it is thread safe, there probably need to be some memory barriers to ensure data is flushed correctly, but I would assume these barriers are issued by the framework code itself.

JonasH
  • 28,608
  • 2
  • 10
  • 23
3

The System.Async.Linq, as well as the whole dotnet/reactive repository, is currently a semi-abandoned project. The issues on GitHub are piling up, and nobody answers them officially for almost a year. There is no documentation published, apart from the XML documentation in the source code on top of each method. You can't really use this library without studying the source code, which is generally easy to do because the code is short, readable, and honestly doesn't do too much. The functionality offered by this library is similar with the functionality found in the System.Linq, with the main difference being that the input is IAsyncEnumerable<T> instead of IEnumerable<T>, and the delegates can return values wrapped in ValueTask<T>s.

With the exception of a few operators like the Merge (and only one of its overloads), the System.Async.Linq doesn't introduce concurrency. The asynchronous operations are invoked one at a time, and then they are awaited before invoking the next operation. The SelectManyAwaitWithCancellation operator is not one of the exceptions. The selector is invoked sequentially for each element, and the resulting IAsyncEnumerable<TResult> is enumerated sequentially, and its values yielded the one after the other. So it's unlikely to create thread-safety issues.

The ForEachAsync operator is just a substitute of doing a standard await foreach loop, and was included in the library at a time when the C# language support for await foreach was non existent (before C# 8). I would recommend against using this operator, because its resemblance with the new Parallel.ForEachAsync API could create confusion. Here is what is written inside the source code of the ForEachAsync operator:

// REVIEW: Once we have C# 8.0 language support, we may want to do away with these
//         methods. An open question is how to provide support for cancellation,
//         which could be offered through WithCancellation on the source. If we still
//         want to keep these methods, they may be a candidate for
//         System.Interactive.Async if we consider them to be non-standard
//         (i.e. IEnumerable<T> doesn't have a ForEach extension method either).
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104