8

I have the following WebApi method that returns an unbounded result stream from RavenDB:

public IEnumerable<Foo> Get()
{
    var query = DocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = DocumentSession.Advanced.Stream(query))
        while (enumerator.MoveNext())
            yield return enumerator.Current.Document;
}

Now I'd like to make that async. The naive approach of course doesn't work:

public async Task<IEnumerable<Location>> Get()
{
    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        while (await enumerator.MoveNextAsync())
            yield return enumerator.Current.Document;
}

...because the method can't be both async and an iterator.

Diego Mijelshon
  • 52,548
  • 16
  • 116
  • 154
  • 1
    You could implement your own iterator. *But*, calling `MoveNext` on the iterator would have to be async as well - meaning you can't implement `IEnumerable`, you'd have to define your own interface. And you wouldn't be able to use that iterator in a `foreach` loop either. – dcastro May 30 '14 at 14:24
  • Yes, all those restrictions are correct. Since I'm just returning this (which will be serialized by WebApi), I don't need a lot of flexibility. Maybe implementing a MediaTypeFormatter that understands `Task>>` – Diego Mijelshon May 30 '14 at 14:30
  • possible duplicate of [Using async / await with DataReader ? ( without middle buffers!)](http://stackoverflow.com/questions/23854102/using-async-await-with-datareader-without-middle-buffers) – noseratio May 31 '14 at 11:25
  • 1
    @noseratio, the problem is similar, but not a duplicate. – Diego Mijelshon May 31 '14 at 12:04
  • @DiegoMijelshon, IMO it's the same problem. You're trying to combine `await` and `yield` within the same method, which is a legit intent, but it's unsupported by C# language yet. I proposed a "kind of" solution [there](http://stackoverflow.com/a/23869010/1768303). – noseratio May 31 '14 at 12:18
  • 1
    @noseratio the proposed *solution* would apply. But since I'm using Web Api, I have the opportunity to support the IAsyncEnumerator directly using a formatter, so I can avoid the helper. Using yield was a solution, not a requirement. – Diego Mijelshon May 31 '14 at 12:26

5 Answers5

11

Since this is a WebAPI action method, HTTP restricts you to a single response. If you just return an IEnumerable<T>, then ASP.NET will enumerate it in-memory and then send the response.

If you're fine with this in-memory process, then you can just do the same thing yourself:

public async Task<List<Location>> Get()
{
  var result = new List<Location>();
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
    while (await enumerator.MoveNextAsync())
      result.Add(enumerator.Current.Document);
  return result;
}

However, I believe it would be better to use a streamed response, which you can get via PushStreamContent; something like this:

public HttpResponseMessage Get()
{
  var query = AsyncDocumentSession.Query<Foo, FooIndex>();
  HttpResponseMessage response = Request.CreateResponse();
  response.Content = new PushStreamContent(
      async (stream, content, context) =>
      {
        using (stream)
        using (var enumerator = await AsyncDocumentSession.Advanced.StreamAsync(query))
        {
          while (await enumerator.MoveNextAsync())
          {
            // TODO: adjust encoding as necessary.
            var serialized = JsonConvert.SerializeObject(enumerator.CurrentDocument);
            var data = UTF8Encoding.UTF8.GetBytes(serialized);
            var countPrefix = BitConverter.GetBytes(data.Length);
            await stream.WriteAsync(countPrefix, 0, countPrefix.Length);
            await stream.WriteAsync(data, 0, data.Length);
          }
        }
      });
  return response;
}

The streamed response doesn't require your server to hold the entire response in memory; however, you'll have to decide on the proper way to write documents to the response stream. The example code above just converts them to JSON, encodes in UTF8, and (binary) length-prefixes those strings.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • This is actually not a bad idea (the PushStream one). It might be even a bit better than mine, which reads async but writes sync. Maybe a mix of both would be cool. – Diego Mijelshon May 31 '14 at 14:46
1

You could implement your own iterator, instead of letting the compiler generate one for you.

But, calling MoveNext on that iterator would have to be async as well - meaning you can't implement IEnumerable<T>`IEnumerator, you'd have to define your own interface, e.g.,IAsyncEnumerator`. And you wouldn't be able to use that iterator in a foreach loop either.

The way I see it, your best bet is to do what StreamAsync does. Create a custom type IAsyncEnumerable that returns an IAsyncEnumerator<T> that implements a custom async T MoveNextAsync() method. The enumerable would wrap your query object, and the enumerator would fetch a document session's document.

internal class AsyncDocumentEnumerable : IAsyncEnumerable<Document>
{
    private readonly YourQueryType _query;
    public AsyncDocumentEnumerable(YourQueryType query)
    {
        _query = query;
    }

    IAsyncEnumerator<Document> GetEnumerator()
    {
        return new AsyncDocumentEnumerator(_query);
    }
}


internal class AsyncDocumentEnumerator : IAsyncDocumentEnumerator<Document>
{
    private readonly YourQueryType _query;
    private IAsyncEnumerator<DocumentSession> _iter;

    public AsyncDocumentEnumerator(YourQueryType query)
    {
        _query = query;
    }

    public Task<bool> async MoveNextAsync()
    {
        if(_iter == null)
            _iter = await AsyncDocumentSession.Advanced.StreamAsync(query);

        bool moved = await _iter.MoveNextAsync();

        if(moved)
            Current = _iter.Current.Document;
        return moved;
    }

    public Document Current{get; private set;}
}
dcastro
  • 66,540
  • 21
  • 145
  • 155
1

It wasn't that hard after all. The solution was a formatter that could process the enumerator asynchronously and write JSON to the stream:

public class CustomJsonMediaTypeFormatter : JsonMediaTypeFormatter
{
    public override async Task WriteToStreamAsync(
           Type type, object value, Stream writeStream, HttpContent content,
           TransportContext transportContext, CancellationToken cancellationToken)
    {
        if (type.IsGenericType &&
            type.GetGenericTypeDefinition() == typeof(IAsyncEnumerator<>))
        {
            var writer = new JsonTextWriter(new StreamWriter(writeStream))
                         { CloseOutput = false };
            writer.WriteStartArray();
            await Serialize((dynamic)value, writer);
            writer.WriteEndArray();
            writer.Flush();
        }
        else
            await base.WriteToStreamAsync(type, value, writeStream, content,
                                          transportContext, cancellationToken);
    }

    async Task Serialize<T>(IAsyncEnumerator<StreamResult<T>> enumerator,
                            JsonTextWriter writer)
    {
        var serializer = JsonSerializer.Create(SerializerSettings);
        while (await enumerator.MoveNextAsync())
            serializer.Serialize(writer, enumerator.Current.Document);
    }
}

Now my WebApi method is even shorter than before:

public Task<IAsyncEnumerator<StreamResult<Foo>>> Get()
{
    var query = AsyncDocumentSession.Query<Foo, FooIndex>();
    return AsyncDocumentSession.Advanced.StreamAsync(query);
}
Diego Mijelshon
  • 52,548
  • 16
  • 116
  • 154
1

In C#8 they've introduced IAsyncEnumerable<int>

    async IAsyncEnumerable<int> GetBigResultsAsync()
    {
        await foreach (var result in GetResultsAsync())
        {
            if (result > 20) yield return result; 
        }
    }
Prasanth Louis
  • 4,658
  • 2
  • 34
  • 47
0

You can look at ReactiveExtensions for .Net, they are designed specifically for your needs. End result might look like this:

public IObservable<Location> Get()
        {
            var locations = new Subject<Location>();

            Task.Run(() =>
                     {
                         var query = DocumentSession.Query<Foo, FooIndex>();
                         foreach (var document in DocumentSession.Advanced.Stream(query))
                         {
                             locations.OnNext(document);
                         }
                         locations.OnCompleted();
                     });

            return locations;
        }
Stas Shusha
  • 528
  • 3
  • 11
  • you are just wrapping a non-async call there. It defeats the purpose of async (not using threads while IO is happening) – Diego Mijelshon May 30 '14 at 15:47
  • Then do not wrap. I just made an example of working with synchronous API. If you have asynchronous API, you dont need any `Task.Run()` calls for sure. There are plenty of ways to create `IObservable` sequence from any kind of API, synchronous and asynchronous. – Stas Shusha Jun 02 '14 at 09:10
  • I just realized that `AsyncDocumentSession` is something publicly known from RavenDB client and not your custom stuff... Sorry, I've never seen this before, so I cant create a working prototype with `RX` easily, but I still believe that `IObservable` fits ideally to your problem. – Stas Shusha Jun 02 '14 at 09:25