2

I've watched the chat on LINQ with IAsyncEnumerable which has given me some insight on dealing with extension methods for IAsyncEnumerables, but wasn't detailed enough frankly for a real-world application, especially for my experience level, and I understand that samples/documentation don't really exist as of yet for IAsyncEnumerables

I'm trying to read from a file, do some transformation on the stream, returning a IAsyncEnumerable, and then send those objects downstream after an arbitrary number of objects have been obtained, like:

await foreach (var data in ProcessBlob(downloadedFile))
{
    //todo add data to List<T> called listWithPreConfiguredNumberOfElements
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
        
    //repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
}

My understanding from reading on the matter so far is that the await foreach line is working on data that employs the use of Tasks (or ValueTasks), so we don't have a count up front. I'm also hesitant to use a List variable and just do a length-check on that as sharing that data across threads doesn't seem very thread-safe.

I'm using the System.Linq.Async package in the hopes that I could use a relevant extensions method. I can see some promise in the form of TakeWhile, but my understanding on how thread-safe the task I intend to do is not all there, causing me to lose confidence.

Any help or push in the right direction would be massively appreciated, thank you.

razlani
  • 23
  • 5
  • `System.Linq.Async` is part of Reactive extensions – Pavel Anikhouski Jul 29 '20 at 09:21
  • 1
    My first Idea would be TPL DataFlow with a [BatchBlock](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-using-batchblock-and-batchedjoinblock-to-improve-efficiency)... – Fildor Jul 29 '20 at 09:23
  • 3
    The samples and docs are there. People think that IAsyncEnumerable is more than it really is though. It's 'just' a way to enumerate asynchronously, not a new way to construct pipelines, or a new way of multi-threading. It's neither a Dataflow block nor a Channel. It can be the glue between steps in a pipeline – Panagiotis Kanavos Jul 29 '20 at 09:24
  • 1
    Whether you can use a List or need a ConcurrentQueue depends on how your processing code works, not the source (IAsyncEnumerable), just as it doesn't depend on `IEnumerable`. If you have multiple tasks consuming from the source, you need `ConcurrentQueue`. If you only have one tasks, you can use a `List`, although that will prevent you from ever using multiple tasks. A batch operation doesn't need multiple tasks though – Panagiotis Kanavos Jul 29 '20 at 09:28
  • 1
    I suspect you should clean up your code first, convert it to a form that makes creating pipelines easy. Field-level handlers make things a lot harder. It's far easier to work with LINQ-style methods - methods that accept an `IAsyncEnumerable` as parameter and *return* another one. You could chain multiple methods one after the other to create a pipeline, always knowing what each method does, how it handles concurrency etc. A method `IAsyncEnumerable>` Batch(this IAsyncEnumerable source, int batchSize)` allows `ProcessBlob(downloadedFile).Batch(100)....` – Panagiotis Kanavos Jul 29 '20 at 09:31
  • Just out of experience: Also mind the situation, where your batch isn't full for an extended amount of time ... – Fildor Jul 29 '20 at 10:53
  • Related: [How to batch an IAsyncEnumerable, enforcing a maximum interval policy between consecutive batches?](https://stackoverflow.com/questions/67661709/how-to-batch-an-iasyncenumerablet-enforcing-a-maximum-interval-policy-between) – Theodor Zoulias May 28 '21 at 09:14

2 Answers2

4

There is an operator Buffer that does what you want, in the package System.Interactive.Async.

// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

This package contains operators like Amb, Throw, Catch, Defer, Finally etc that do not have a direct equivalent in Linq, but they do have an equivalent in System.Reactive. This is because IAsyncEnumerables are conceptually closer to IObservables than to IEnumerables (because both have a time dimension, while IEnumerables are timeless).

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
2

I'm also hesitant to use a List variable and just do a length-check on that as sharing that data across threads doesn't seem very thread-safe.

You need to think in terms of execution flows, not threads, when dealing with async; since you are await-ing the processing step, there isn't actually a concurrency problem accessing the list, because regardless of which threads are used: the list is only accessed once at a time.

If you are still concerned, you could new a list per batch, but that is probably overkill. What you do need, however, is two additions - a reset between batches, and a final processing step:

var listWithPreConfiguredNumberOfElements = new List<YourType>(preConfiguredNumber);
await foreach (var data in ProcessBlob(downloadedFile)) // CAF?
{
    listWithPreConfiguredNumberOfElements.Add(data);
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
    {
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
        listWithPreConfiguredNumberOfElements.Clear(); // reset for a new batch
        // (replace this with a "new" if you're still concerned about concurrency)
    }
}
if (listWithPreConfiguredNumberOfElements.Any())
{   // process any stragglers
    await _messageHandler.Handle(listWithPreConfiguredNumberOfElements); // CAF?
}

You might also choose to use ConfigureAwait(false) in the three spots marked // CAF?

Ackdari
  • 3,222
  • 1
  • 16
  • 33
Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • It was the shift in thinking that settled this for me - somewhat of a paradigm shift and was causing confusion. I'm upvoting this as the solution was obvious but the explanation behind it really took it over the line for me, thank you – razlani Jul 31 '20 at 00:50