2

I have a method that accepts an IAsyncEnumerable as argument, and returns also an IAsyncEnumerable. It calls a web method for each item in the input stream, and propagates the result to the output stream. My question is how can I be notified if the caller of my method has stopped enumerating the output stream, so I can stop enumerating the input stream inside my method? It seems that I should be able to be notified because the caller disposes by default the IAsyncEnumerator that gets from my method. Is there any build-in mechanism that generates such a notification for compiler-generated async methods? If not, what is the easiest to implement alternative?

Example. The web method validates if an url is valid or not. There is a never ending stream of urls provided, but the caller stops enumerating the results when more than 2 invalid urls are found:

var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
    Console.WriteLine($"Url {result.Url} is "
        + (result.IsValid ? "OK" : "Invalid!"));
    if (!result.IsValid) invalidCount++;
    if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);

The generator of the urls. One url is generated every 300 msec.

private static async IAsyncEnumerable<string> GetMockUrls()
{
    int index = 0;
    while (true)
    {
        await Task.Delay(300);
        yield return $"https://mock.com/{++index:0000}";
    }
}

The validator of the urls. There is a requirement that the input stream is enumerated eagerly, so two asynchronous workflows are running in parallel. The first workflow inserts the urls in a queue, and the second workflow picks the urls one by one and validates them. A BufferBlock is used as async queue.

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
    {
        yield return (url, await MockValidateUrl(url));
    }
}

Clarification: the queue is mandatory, and removing it is not an option. It is an essential component of this problem.

The validator of a single url. The validation process lasts 300 msec on average.

private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
    await Task.Delay(_random.Next(100, 600));
    return _random.Next(0, 2) != 0;
}

Output:

Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...

The problem is that urls are still generated and received after the caller/client has finished the asynchronous enumeration. I would like to fix this, so that no more messages appear in the console after --Async enumeration finished--.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Could you use a `CancellationToken` shared between both enumerables to exit, instead of using a manual `break` statement? – Bradley Uffner Oct 03 '19 at 11:17
  • @BradleyUffner Yeap, this is definitely doable. But I don't really want to transfer this responsibility to the caller. The only responsibility the caller should have is to dispose the `IAsyncEnumerator`. – Theodor Zoulias Oct 03 '19 at 11:29
  • It seems that the caller *already* has responsibility by having to manually `break` out of the loop. – Bradley Uffner Oct 03 '19 at 11:52
  • @BradleyUffner breaking the loop is business logic. Cancelling a cancellation token on top of that is a burden, that leaks the abstraction of the `await foreach`. It's like driving a car with two brake pedals, and having the responsibility to press the one after the other (if you forget to press the second pedal the brake fluid will boil). – Theodor Zoulias Oct 03 '19 at 12:28
  • Looks more like you have a truck following another truck. The truck in the front is throwing stuff to the truck following it and you want to stop the front truck by applying the breaks on the back truck. – Paulo Morgado Oct 03 '19 at 12:49
  • Your generator *won't* produce anything unless the client asks for it. This is still an iterator and each `yield` proceeds only if the output is actually consumed. – Panagiotis Kanavos Oct 03 '19 at 14:03
  • As for the `BufferBlock`, that's trying to emulate a Channel. The `Task.Run` isn't really needed as the only *non* async operation is `Console.WriteLine($"Url {url} received");`. First, do you *need* a buffer there? Without it, a simple `await foreach(var url in urls){ yield return MockValidateUrl(url);}` would work just fine. With *any* kind of buffer you end up having to wait for the buffer to fill – Panagiotis Kanavos Oct 03 '19 at 14:06
  • @PauloMorgado I wish it was as simple as joining the trucks together. I am fighting all day with this problem, and still can't find a solution. – Theodor Zoulias Oct 03 '19 at 14:29
  • @TheodorZoulias the problem is *caused* by the buffer. If it had eg a boundary, the worker task would stop once it was full. Is there a reason for the buffer block ? Without it you could use a simple iterator method. A cancellation token is required in any case - even if the worker stops, it won't complete. – Panagiotis Kanavos Oct 03 '19 at 14:41
  • @PanagiotisKanavos you are right that the `Task.Run` is kind of redundant here. Maybe I over-simplified the example. My actual problem is more complex though, and I can't get away without the buffer. I am still searching for a way to pass a notification from [DisposeAsync](https://learn.microsoft.com/en-us/dotnet/api/system.iasyncdisposable.disposeasync), so that I can cancel the buffer-feeding task without resorting to adding burden to the caller. – Theodor Zoulias Oct 03 '19 at 14:44
  • @TheodorZoulias *why* do you need the buffer, especially unbounded? The size of the buffer determines how long the worker will keep pumping items. There's no DisposeAsync in your code. In any case what is the *actual* problem? I suspect you could pass ChannelReaders around instead of IAsyncEnumerables. Or you could signal the "global" CancellationToken for the entire pipeline as a continuation of `DisposeAsync` or any task that completes when the work completes – Panagiotis Kanavos Oct 03 '19 at 14:47
  • 1
    @TheodorZoulias in any case, it's the *caller* that decided to end the iteration prematurely. It's the caller's job to tell the upstream tasks they need to end and release their resources. Another option would be to add a timeout that cancels the worker even if the caller "forgets" to cancel – Panagiotis Kanavos Oct 03 '19 at 14:50
  • @PanagiotisKanavos There is always an implicit `IAsyncEnumerator.DisposeAsync` after each `await foreach`. The enumerator is disposed automatically by the infrastructure, and I am trying to take advantage of that. The reason I need the buffer is because instead of the urls I actually buffer the tasks that validate the urls, and propagate the results as they become available. The urls are not propagated in the same order they came in. Surely there are other ways to do this, but I am trying to familiarize myself with the new asynchronous enumeration capabilities of C# 8. – Theodor Zoulias Oct 03 '19 at 15:11
  • @TheodorZoulias that changes the problem a *lot*. You should explain all this in the question – Panagiotis Kanavos Oct 03 '19 at 15:17
  • @PanagiotisKanavos I tried to keep it simple. If there is a solution for the simplified example, the same solution should work for the complex problem too. My objective is to pass the notification from the async disposal into the body of the asynchronous iterator. Maybe the solution involves the [`WithCancellation`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskasyncenumerableextensions.withcancellation) method and the [`EnumeratorCancellation`](https://docs.microsoft.com/en-us/dotnet/api/system.runtime.compilerservices.enumeratorcancellationattribute) attribute. – Theodor Zoulias Oct 03 '19 at 15:34
  • @TheodorZoulias they are *different* problems. In any case, to make multiple concurrent calls you'd need to start multiple concurrent tasks. I'm not sure you can consume IAsyncEnumerable from multiple tasks. That's a fan-out/fan-in problem (I'm stealing the term from Go). Cancellation is a *different* problem. There are ways to combine those but no magic. – Panagiotis Kanavos Oct 03 '19 at 15:41
  • @PanagiotisKanavos we have concluded from a [previous question](https://stackoverflow.com/questions/58194212/how-to-implement-an-efficient-wheneach-that-streams-an-iasyncenumerable-of-task) that it is doable to produce a single `IAsyncEnumerable` that is driven my multiple concurrent tasks. This is a solved problem now. – Theodor Zoulias Oct 03 '19 at 15:52
  • You want to *consume* it from multiple tasks. That's a different problem. In any case, you *can't* signal anything to previous IAsyncEnumerable instances, much less the worker task behind the buffer one or two calls away. You have to use a CancellationToken. If you want to ensure people use it correctly you can create an extension method that takes the IAsyncEnumerable and the CTS, applies an Action to each item and signals the CTS at the end. The `LastStep` method can be converted to a generic method easily – Panagiotis Kanavos Oct 03 '19 at 16:02
  • @PanagiotisKanavos I have already managed to produce a working solution, that indeed makes use of a `CancellationToken`. It is a very ugly solution though. It involves duplicating the `ValidateUrls` iterator method, one private with token and one public without token, then enumerating the first from inside the second, after wrapping it into an `AsyncEnumerableWrapper` that adds a hook at `DisposeAsync` that cancels the token. If this is the only solution then there is no solution, because I can't live with that. – Theodor Zoulias Oct 03 '19 at 16:11

2 Answers2

2

Edit

The discussion will be easier with an appropriate example. Validating URLs isn't so expensive. What if you need to hit eg 100 URLs and pick the first 3 responses?

In that case both the worker and the buffer make sense.

Edit 2

One of the comments adds extra complexity - the tasks are executed concurrently and the results need to be emitted as they arrive.


For starters, ValidateUrl could be rewritten as an iterator method:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    await foreach (var url in urls)
    {
        Console.WriteLine($"Url {url} received");
        var isValid=await MockValidateUrl(url);
        yield return (url, isValid);
    }
}

There's no need for a worker Task as all methods are asynchronous. The iterator method won't proceed unless a consumer asks for a result. Even if MockValidateUrl does something expensive, it could use a Task.Run itself or get wrapped in a Task.Run. That would generate quite a few tasks though.

For completeness' sake you can add a CancellationToken and ConfigureAwait(false) :

public static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
       IAsyncEnumerable<string> urls, 
       [EnumeratorCancellation]CancellationToken token=default)
{
    await foreach(var url in urls.WithCancellation(token).ConfigureAwait(false))
    {
        var isValid=await MockValidateUrl(url).ConfigureAwait(false);
        yield return (url,isValid);
    }
}

In any case, as soon as the caller stops iterating, ValidateUrls will stop.

Buffering

Buffering is a problem - no matter how it's programmed, the worker won't stop until the buffer fills. The buffer's size is how many iterations the worker will go on before it realizes it needs to stop. This is a great case for a Channel (yes, again!) :

public static IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        IAsyncEnumerable<string> urls,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(2);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.WithCancellation(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader.ReadAllAsync(token);
}

It's better to pass around ChannelReaders instead of IAsyncEnumerables though. At the very least, no async enumerator is constructed until someone tries to read from the ChannelReader. It's also easier to construct pipelines as extension methods :

public static ChannelReader<(string Url, bool IsValid)> ValidateUrls(
        this ChannelReader<string> urls,int capacity,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<(string Url, bool IsValid)>(capacity);
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
                await foreach(var url in urls.ReadAllAsync(token))
                {
                    var isValid=await MockValidateUrl(url);
                    await writer.WriteAsync((url,isValid));
                }
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        
    return channel.Reader;
}

This syntax allows constructing pipelines in a fluent manner. Let's say we have this helper method to convert IEnumerables to channesl (or IAsyncEnumerables) :

public static ChannelReader<T> AsChannel(
         IEnumerable<T> items)
{
    var channel=Channel.CreateUnbounded();        
    var writer=channel.Writer;
    foreach(var item in items)
    {
        channel.TryWrite(item);
    }
    return channel.Reader;
}

We can write :

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls();

await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
   //Use the items here
}

Concurrent calls with immediate propagation

That's easy with channels, although the worker in this time needs to fire all of the tasks at once. Essentially, we need multiple workers. That's not something that can be done with just IAsyncEnumerable.

First of all, if we wanted to use eg 5 concurrent tasks to process the inputs we could write

    var tasks = Enumerable.Range(0,5).
                  .Select(_ => Task.Run(async ()=>{
                                 /// 
                             },token));
    _ = Task.WhenAll(tasks)(t=>writer.Complete(t.Exception));        

instead of :

    _ = Task.Run(async ()=>{
        /// 
        },token)
        .ContinueWith(t=>writer.Complete(t.Exception));        

Using a large number of workers could be enough. I'm not sure if IAsyncEnumerable can be consumed by multiple workers, and I don't really want to find out.

Premature Cancellation

All of the above work if the client consumes all results. To stop processing after eg the first 5 results though, we need the CancellationToken :

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     //takes a list and writes it to a channel
                    .ValidateUrls(cts.Token);

int i=0;
await foreach(var (url,isValid) in pipeline.ReadAllAsync())
{
    //Break after 3 iterations
    if(i++>2)
    {
        break;
    }
    ....
}

cts.Cancel();

This code itself could be extracted in a method that receives a ChannelReader and, in this case, the CancellationTokenSource :

static async LastStep(this ChannelReader<(string Url, bool IsValid)> input,CancellationTokenSource cts)
    {
    int i=0;
    await foreach(var (url,isValid) in pipeline.ReadAllAsync())
    {
        //Break after 3 iterations
        if(i++>2)
        {
            break;
        }
        ....
    }

    cts.Cancel();        
}

And the pipeline becomes :

var cts=new CancellationTokenSource();

var pipeline=urlList.AsChannel()     
                    .ValidateUrls(cts.Token)
                    .LastStep(cts);
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • Thanks Panagiotis for the answer. It is very interesting as always. All these stuff about chaining channels are fascinating! My question though has to do specifically with `await foreach` and the `IAsyncEnumerable`. It's not about a problem searching for any solution. It is about learning the capabilities (and the limits) of the new language feature of asynchronous enumeration in C# 8. – Theodor Zoulias Oct 03 '19 at 16:33
  • About the line `await foreach(var url in urls.WithCancellation(token))`, it's not doing what I initially thought it would do. As is now, cancelling the token will have no effect. For the enumeration to be cancelled it's required that the method `GetMockUrls` that produced the `urls` accepts a `CancellationToken` argument decorated with the `EnumeratorCancellation`, **and** uses internally this token to break the loop. In other words the existing cancellation pattern is cooperative. You can't cancel the enumeration of an async enumerable that was not designed to be cancelled at the first place! – Theodor Zoulias Oct 03 '19 at 16:43
  • I took the time to read carefully this answer, and played a bit with the code examples. I do agree that the `ChannelReader` is superior to the `IAsyncEnumerable` at its ability to be enumerated concurrently by multiple consumers. The `IAsyncEnumerator` is not thread-safe. On all other aspects it seems to me that the `IAsyncEnumerable` is superior. 1) It's much more fluent and elegant to produce and consume because of the language support. 2) Ensures the disposal of resources by invoking automatically the finally blocks. No cooperative cancellation patterns like `LastStep` are required. – Theodor Zoulias Oct 12 '19 at 22:59
2

I suppose I should answer my own question, since I now have a simple enough generic solution.

Update: I am scraping my previous answer because I discovered a much easier solution. It is embarassingly simple actually. All I have to do is to enclose the yielding part of the ValidateUrls iterator into a try-finally block. The finally block will be executed on every case, either by the caller completing normally the enumeration, or abnormally by a break or an exception. So this is how I can get the notification I am looking for, by cancelling a CancellationTokenSource on finally:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    var completionCTS = new CancellationTokenSource();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            if (completionCTS.IsCancellationRequested) break;
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    try
    {
        while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
        {
            yield return (url, await MockValidateUrl(url));
        }
    }
    finally // This runs when the caller completes the enumeration
    {
        completionCTS.Cancel();
    }
}

I should probably note that an async iterator that doesn't support cancellation is not a good practice. Without it the caller has no easy way to stop the awaiting between the consumption of one value and the next. So a better signature for my method should be:

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls,
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{

The token could then be passed to the awaited methods of the yielding loop, the OutputAvailableAsync and the MockValidateUrl.

From the caller's perspective, the token can be passed either directly, or by chaining the extension method WithCancellation.

await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104