13

I am using Dapper to stream data from a very large set in SQL Server. It works fine with returning IEnumerable and calling Query(), but when I switch to QueryAsync(), it seems that the program tries to read all of the data from SQL Server instead of streaming.

According to this question, it should work fine with buffered: false, which I am doing, but the question says nothing about async/await.

Now according to this question, it's not straightforward to do what I want with QueryAsync().

Do I understand correctly that enumerables are iterated when the context is switched for async/await?

Another question if this is something that will be possible to do when the new C#8 async streaming is available?

Ilya Chernomordik
  • 27,817
  • 27
  • 121
  • 207
  • `async` / `await` does not cause `IEnumerable` to get iterated. Though it may make some iteration patterns "tricky". – Bradley Uffner Apr 05 '19 at 13:40
  • 1
    `async/await` has nothing to do with how QueryAsync behaves. If `QueryAsync`'s implementation reads everything before returning the IEnumerable, there's nothing you can do. The second question isn't about Dapper so it doesn't apply. Dataflows are a great way to create processing pipelines no matter how Dapper works either – Panagiotis Kanavos Apr 05 '19 at 13:52
  • As for async streams, they won't have any effect unless QueryAsync is coded to return an IAsyncEnumerable – Panagiotis Kanavos Apr 05 '19 at 13:55
  • 1
    Whoever voted to close deserves, ... whatever Marc Gravel decides. I suspect he's the one that will have to implement async streaming. – Panagiotis Kanavos Apr 05 '19 at 14:05
  • 1
    Well, I got two downvotes on another question today without any explanation, so I really think people want to close everything if it's a bit more complex question or not one where one can run example code :) @PanagiotisKanavos I also like that close reason is "This question does not appear to be about programming". If that is not related to programming, I don't know what is :) – Ilya Chernomordik Apr 05 '19 at 14:09

3 Answers3

13

Update March 2020

.NET Core 3.0 (and 3.1) have come out now, with full support for async streams. The Microsoft.Bcl.AsyncInterfaces adds support for them to .NET Standard 2.0 and .NET Framework 4.6.1+, although 4.7.2 should be used for sanity reasons. As the docs on .NET Standard implementation support explain

While NuGet considers .NET Framework 4.6.1 as supporting .NET Standard 1.5 through 2.0, there are several issues with consuming .NET Standard libraries that were built for those versions from .NET Framework 4.6.1 projects.

For .NET Framework projects that need to use such libraries, we recommend that you upgrade the project to target .NET Framework 4.7.2 or higher.

Original Answer

If you check the source code, you'll see that your suspicion is almost correct. When buffered is false, QueryAsync will stream synchronously.

if (command.Buffered)
{
    var buffer = new List<T>();
    var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
    while (await reader.ReadAsync(cancel).ConfigureAwait(false))
    {
        object val = func(reader);
        if (val == null || val is T)
        {
            buffer.Add((T)val);
        }
        else
        {
            buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
        }
    }
    while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
    command.OnCompleted();
    return buffer;
}
else
{
    // can't use ReadAsync / cancellation; but this will have to do
    wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
    var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
    reader = null; // to prevent it being disposed before the caller gets to see it
    return deferred;
}

As the comment explains, it's not possible to use ReadAsync when the return type is expected to be IEnumerable. That's why C# 8's async enumerables had to be introduced.

The code for ExecuteReaderSync is :

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (reader.Read())
        {
            yield return (T)func(reader);
        }
        while (reader.NextResult()) { /* ignore subsequent result sets */ }
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

It uses Read instead of ReadAsync.

C#8 async streams will allow rewriting this to return an IAsyncEnumerable. Simply changing the language version won't solve the problem.

Given the current docs on async streams this could look like :

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
    using (reader)
    {
        while (await reader.ReadAsync())
        {
            yield return (T)func(reader);
        }

        while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
         command.OnCompleted();
        (parameters as IParameterCallbacks)?.OnCompleted();
    }
}

Buuuuuut async streams is one of the things that can only work on .NET Core, and probably isn't implemented yet. When I tried to write one in Sharplab.io, Kaboom. [connection lost, reconnecting…]

Community
  • 1
  • 1
Panagiotis Kanavos
  • 120,703
  • 13
  • 188
  • 236
  • I see, so it's not async that enumerates the enumerable, rather it's not possible to do `yield return` with async/await at all, so `buffered:false` is actually a bit of a lie for QueryAsync :) – Ilya Chernomordik Apr 05 '19 at 14:07
  • 1
    @IlyaChernomordik not exactly. The results *are* streamed, synchronously. `ExecuteReaderSync` is an iterator returns one item at a time, as soon as it arrives. That operation though runs synchronously – Panagiotis Kanavos Apr 05 '19 at 14:09
  • @IlyaChernomordik sharplab.io crashed when I tried to write a method with IAsyncEnumerable so I suspect the feature isn't quite there yet – Panagiotis Kanavos Apr 05 '19 at 14:15
  • A bit confused on difference between sync/async stream, will asynchrounous streaming mean that each element of an IEnumerable will be awaitable in a way? Whil sync streaming mean that we block while we iterate over the whole enumerable? – Ilya Chernomordik Apr 05 '19 at 14:16
  • 1
    @IlyaChernomordik You don't need to wait for the whole enumerable, right now, but requesting the next item will block. The next item will only be requested when you explicitly ask for it. In a `foreach` loop for example, it will be requested every time your code loops. The problem is that returning a big row over a slow connection may block `Read()` for a relatively long time. – Panagiotis Kanavos Apr 05 '19 at 14:22
  • 1
    @IlyaChernomordik in C# 8 your own code will have to change to use async streams, eg `await foreach(var item in dapperResults){..}`. In this case, iterating *won't* block each time you try to read the next item. – Panagiotis Kanavos Apr 05 '19 at 14:24
  • Yes, I see the difference now I think. Thanks a lot for the explanation, can't vote more than once unfortunately :) It's a bit complicated topics though to understand fully – Ilya Chernomordik Apr 05 '19 at 14:28
  • If I understood correctly what will happen with Dapper with buffered and QueryAsync is that will asynchronously perform required actions to start streaming (like connection opening, send sql, etc.) and then it returns IEnumerable (this is non blocking now), but when we will iterate, then it'll be blocking – Ilya Chernomordik Apr 05 '19 at 14:31
  • 2
    @IlyaChernomordik if you're **buffered**, then it will **asynchronously** fill the buffer, and only return (async) it to you *when full*; when you enumerate it it is **OK** to be sync, precisely because it is already all buffered locally. The problem scenarios is non-buffered; when *non-buffered* that it gets hairy - then it is async until it gets data, then it switches to sync *while still reading data from the DB* – Marc Gravell Apr 05 '19 at 15:10
  • "async streams is one of the things that can only work on .NET Core" - not true; the pattern is implemented by the compiler / duck-type (just like regular `foreach`), so custom async enumerable types work fine with `await foreach`, and don't even need to implement `IAsyncEnumerable`. But: there's also `System.Linq.Async` which plugs the gap for older runtimes by declaring the interface - it isn't an official library, but ... it is OK for this. – Marc Gravell Apr 05 '19 at 15:13
  • As @MarcGravell said, it doesn't need to be .net core to have async streams. I used in in .net Framework 4.8 by simply editing the csproj and setting language version to 8.0 specifically. It probably works in lower .net framework versions too (I would guess any version that supports Tasks) as long as you set the language version to 8.0 – Thanasis Ioannidis Mar 11 '20 at 07:45
  • @ThanasisIoannidis that's not enough, you need to use the [Microsoft.Bcl.AsyncInterfaces](https://www.nuget.org/packages/Microsoft.Bcl.AsyncInterfaces/) package which came out *after* this question was asked (May 2019). Without it the type itself isn't available. Marc Gravell talks about something very different - System.Linq.Async is a package providing LINQ operators for IAsyncEnumerable. I suspect your project got Microsoft.Bcl.AsyncInterfaces as an intermediate dependency – Panagiotis Kanavos Mar 11 '20 at 08:20
12

In the context of dapper specifically, yes: it needs a different API as explained by the excellent answer by @Panagiotis. What follows isn't an answer as such, but is additional context that implementors facing the same challenges may wish to consider.

I haven't "spiked" this for dapper yet (although I have for SE.Redis), and I'm torn between various options:

  1. add a new API for .NET Core only, returning an appropriate async-enumerable type
  2. completely smash the existing API as a breaking change (a "major" etc), changing it to return an async-enumerable type

We'll probably go with "1", but I have to say, the second option is unusually tempting, for good reasons:

  • the existing API probably doesn't do what people expect it to do
  • we'd want new code to start using it

But the odd thing is the .NET Core 3.0-ness of IAsyncEnumerable<T> - as obviously Dapper doesn't just target .NET Core 3.0; we could:

  1. limit the feature to .NET Core 3.0, and return IAsyncEnumerable<T>
  2. limit the library to .NET Core 3.0, and return IAsyncEnumerable<T>
  3. take a dependency on System.Linq.Async (which isn't "official", but is official-enough for our purposes) for the previous frameworks, and return IAsyncEnumerable<T>
  4. return a custom enumerable type that isn't actually IAsyncEnumerable<T> (but which implements IAsyncEnumerable<T> when available), and manually implement the state machine - the duck-typed nature of foreach means this will work fine as long as our custom enumerable type provides the right methods

I think we'll probably go with option 3, but to reiterate: yes, something needs to change.

Marc Gravell
  • 1,026,079
  • 266
  • 2,566
  • 2,900
  • 1
    I actually figured out that for my purposes what the library does is what I need: You get a synchronous stream, but you get it in an async way if that makes sense? :) So if you guys switch to IAsyncEnumerable, I suppose there will be a big overhead compared to synchrounous streaming since it's more complicated, so probably two different methods can be implemented? Especially as buffered async enumerable does not make sense (If I even understood all that correctly) – Ilya Chernomordik Apr 05 '19 at 15:01
  • 1
    @IlyaChernomordik yeah, the current approach works OK in the buffered case, but the non-buffered case doesn't properly respect async all the way through - and FWIW I think a buffered async enumerable *does* make sense - it may even be desirable for simplicity. It could well be that we just need an extra `IAsyncEnumerable` method just for the non-buffered case. We've *also* discussed breaking the API in the next major to split buffered/non-buffered completely, so that people can see the `List` - or possibly an arena-allocated `ReadOnlySequence` – Marc Gravell Apr 05 '19 at 15:04
  • @MarcGravell channels would be a good option 5 here, although they're a bit "heavier" than an IAsyncEnumerable. – Panagiotis Kanavos Apr 05 '19 at 15:13
  • 1
    @PanagiotisKanavos yeah, we went that route with SE.Redis for pub/sub subscriptions - but... channels is a pneumatic-hammer to crack a wallnut here; great for producer/consumer scenarios, or scenarios with multiple readers and/or multiple writers, but... for this case? `IAsyncEnumerable` is a more obvious fit – Marc Gravell Apr 05 '19 at 15:15
  • Sounds good with different API for buffered/non-buffered. Since I got IEnumerable so I thought it is not buffered by default actually only to figure out later that there is an option. – Ilya Chernomordik Apr 05 '19 at 15:18
  • 2
    @IlyaChernomordik indeed, and the default is `true` (to buffer), because most of the time people are reading 20 rows, etc, not 2 million - and if we'd defaulted to false, we'd have caused people to see errors like: "I returned the enumerable from the method, passing the `using` block on the connection on the way", "I enumerated it 7 times, and bad things happened", "it keeps saying I have an open reader". Much easier to default to buffered. – Marc Gravell Apr 05 '19 at 15:21
  • Completely agree that default true is very reasonable, it's rather an edge-case as you say that you don't want that. But an explicit IEnumerable vs IReadOnlyCollection can be a nice feature if you guys are willing to do it :) – Ilya Chernomordik Apr 05 '19 at 15:24
  • @MarcGravell I was thinking of a dataflow case here, where asynchronously streaming thousands of rows from a reader asynchronously is desirable. It's lighter than a TransformManyBlock. – Panagiotis Kanavos Apr 05 '19 at 15:25
  • @PanagiotisKanavos well, you can always `Task.Run` something that just dequeues things from an `IAsyncEnumerable` and pushes it into a `Channel` :) Actually, it feels like maybe the channel writer should have a `public static async Task TryWrite(this ChannelWriter writer, IAsyncEnumerable source) { await foreach(var value in source) await writer.WriteAsync(value); }` – Marc Gravell Apr 05 '19 at 15:32
  • I don't use Dapper but my application code is driver independent. Which means that I'm using IDataReader similarly as in Dapper. The problem is that IDataReader doesn't have ReadAsync method, only Read. What is your approach to workaround this? Probably IDataReader should be treated as obsolete and start using DbDataReader, I'm not sure. – robsosno Jul 22 '20 at 08:57
  • @robsosno yes, that, basically; when we next "major" dapper, we'll probably change to `DbDataReader`; right now we do a type check internally – Marc Gravell Jul 22 '20 at 09:06
2

(This is supposed to be a comment // not enough reputation, so far)

Marc Gravell mentions in his reply that IAsyncEnumerable<T> would be preferable, but because of the dependency on NET Core 3.0, it might be better to take a dependency on System.Linq.Async (which could be considered as "official-enough")...

In this context, https://github.com/Dasync/AsyncEnumerable came to my mind (MIT license): It aims to help

... to (a) create an element provider, where producing an element can take a lot of time due to dependency on other asynchronous events (e.g. wait handles, network streams), and (b) a consumer that processes those element as soon as they are ready without blocking the thread (the processing is scheduled on a worker thread instead).

One more qoute, RE: "What happens when C# 8.0 is released?" (FAQ)

C# 8.0 should have a feature of Async Streams. When the version of the language is finally realeased, it should be a straight-forward upgrade path for your application.

Community
  • 1
  • 1
Ghetti Spa
  • 21
  • 3