1

I have a gRPC client and I want to have a method that simplifies its use. The method should return IAsyncEnumerable of items being streamed from the gRPC server. I have a specified timeout for the streaming not to exceed. If the timeout occurs, I want to just walk away with all the items I managed to fetch so far.

Here's what I tried to do:

    public async IAsyncEnumerable<Item> Search(
        SearchParameters parameters, 
        CancellationToken cancellationToken, 
        IDictionary<string, string> headers = null)
    {
        try
        {
            await _client.Search(
                    MapInput(parameters),
                    cancellationToken: cancellationToken,
                    deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                    headers: MapHeaders(headers))
                .ResponseStream.ForEachAsync(item =>
                {
                    yield return MapSingleItem(item); // compilation error
                });
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.DeadlineExceeded)
        {
            _logger.LogWarning("Steam finished due to timeout, a limited number of items has been returned");
        }
    }

Logically, that should work. However, the yield keyword is not supported within lambdas, so it does not compile. Is there any other way I could write it?

mnj
  • 2,539
  • 3
  • 29
  • 58
  • Are you using the [grpc](https://github.com/grpc/grpc/tree/master/src/csharp) library? – Theodor Zoulias Nov 28 '22 at 15:10
  • Yeah, I'm using Grapc.Tools to generate the client. The `_client` variable in my code is the result of Grpc.Tools automatic client generation – mnj Nov 28 '22 at 15:49

2 Answers2

2

You need an intermediate buffer to hold the items, because the consumer of the IAsyncEnumerable<Item> can enumerate it at its own pace. An excellent asynchronous buffer for this purpose is the Channel<T> class.

Another thing that you might want to consider is what happens if the consumer abandons the enumeration of the IAsyncEnumerable<Item> prematurely, either deliberately by breaking or returning, or unwillingly because it suffered an exception. You need to watch for this occurrence, and the best way to do it is to cancel a linked CancellationTokenSource in the finally block of your iterator.

Putting everything together:

public async IAsyncEnumerable<Item> Search(
    SearchParameters parameters, 
    [EnumeratorCancellation] CancellationToken cancellationToken = default,
    IDictionary<string, string> headers = null)
{
    Channel<Item> channel = Channel.CreateUnbounded<Item>();
    using CancellationTokenSource linkedCTS = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);

    Task producer = Task.Run(async () =>
    {
        try
        {
            await _client.Search(
                    MapInput(parameters),
                    cancellationToken: linkedCTS.Token,
                    deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                    headers: MapHeaders(headers))
                .ResponseStream.ForEachAsync(item =>
                {
                    channel.Writer.TryWrite(item);
                }).ConfigureAwait(false);
        }
        finally { channel.Writer.Complete(); }
    });

    try
    {
        await foreach (Item item in channel.Reader.ReadAllAsync()
            .ConfigureAwait(false))
        {
            yield return item;
        }
        await producer.ConfigureAwait(false); // Propagate possible error
    }
    finally
    {
        // Prevent fire-and-forget in case the enumeration is abandoned
        if (!producer.IsCompleted)
        {
            linkedCTS.Cancel();
            await Task.WhenAny(producer).ConfigureAwait(false);
        }
    }
}

Most likely the resulting IAsyncEnumerable<Item> will complete with an OperationCanceledException when the token is canceled. If you prefer your token to have stopping semantics, you should at first rename it to stoppingToken, and then handle accordingly a OperationCanceledException exception inside the producer task.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Seems overly complicated. Rx.net could be done much easier... – Aron Nov 28 '22 at 16:53
  • Thanks @Theodor. One thing that I don’t understand is the last finally block. You said that we should watch out for the case where the consumer prematurely finishes enumeration and that’s why token needs to be cancelled. However, I don’t understand how it works in this code. How do we discover that the enumeration was finished? I mean, if someone takes just 3 elements from the IAsyncEnjmerable, and doesnt take more, how does the finally block fire? Also, I wonder why you GetAwaiter().GetResult() on the producer? – mnj Nov 28 '22 at 17:05
  • @mnj at the consuming site, when a consumer does an `await foreach` on our sequence, the C# compiler generates an invisible `try`/`finally` block that disposes our enumerator in the `finally`. At the producing site, the C# compiler converts our iterator in an invisible class that is both an `IAsyncEnumerable` and an `IAsyncEnumerator`, and puts the code in our `finally` block inside the `DisposeAsync` method of that class. The `.GetAwaiter().GetResult()` was wrong, I changed it to `await`. The intention is to force the consumer to wait until our `producer` task is completed. – Theodor Zoulias Nov 28 '22 at 17:17
  • 1
    @mnj you can see what the compiler generates in [this](https://sharplab.io/#v2:EYLgxg9gTgpgtADwGwBYA+ABATARgLABQGADAAQY4oDchJ5OArDQbQMzlakCyAngMIAbAIYBnEaQDehUjNLTZGdhgAcpAJIBBETwB2YAKI6ArgFsYUIcAEwAPAEsdAFwB8pAMowhUMAAsAFACU8jJSBLLhpI5QPMERoREJ5ACc5EgAdHwQJgAO1o4wACYYSMyJERQ45ADspDiliQC+seEAZg5CAgIxYYnxZQo4SX4ARBgYbTodXcMB9QlNPTILsYrkqsXcPFwwjj4QBYGxfQkYKS3Qnr6kfgBuXqR2+SYPOu6e3v4BQYtxzYkUQ0eMBMsz+S1iCwaQA=) SharpLab demo. The code is quite intimidating because the compiler is allowed to use illegal characters in the identifiers, but it should give you an idea. – Theodor Zoulias Nov 28 '22 at 17:44
-1

With Rx.net you can do this with the .Debounce operator and the .TakeUntil operator.

var input = Observable.Create<Item>((observer, cancellationToken) => 
    Task.Factory.StartNew(() =>
    {
        try
        {
           var items = _client.Search(
                MapInput(parameters),
                cancellationToken: cancellationToken,
                deadline: DateTime.UtcNow.Add(_configuration.Timeout),
                headers: MapHeaders(headers));
           foreach(var item in items)
                observer.OnNext(item);
        }
        catch(Exception ex)
        {
            observer.OnError(ex);
        }
     }, TaskCreationOptions.LongRunning)
);

var inputObservable = input
      .Publish()
      .RefCount();


var timeout = inputObs
     .Throttle(TimeSpan.FromSeconds(10));
var outputObs = inputObservable
    .TakeUntil(timeout);
  

return outputObs
     .ToAsyncEnumerable()
     .ToListAsync();

Edited: assuming _client.Search returns an IEnumerable.

Aron
  • 15,464
  • 3
  • 31
  • 64
  • 1
    @mnj sorry, I'll post another answer later because the current one got derailed. The important GH link [is the issue about yielding in async iterators](https://github.com/dotnet/roslyn/issues/39583). You can fix individual problems (exceptions from async streams, exceptions from the call) without a lot of code using helper methods – Panagiotis Kanavos Nov 29 '22 at 09:43
  • Now ([revision 3](https://stackoverflow.com/revisions/74604016/3)) your answer assumes that the `_client.Search` method returns an `IEnumerable`. How does this follow from the code posted in the question? Beyond that, the use of `Task.Factory.StartNew` violates the [CA2008](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2008) guideline. – Theodor Zoulias Nov 29 '22 at 13:41