19

We have recently migrated our ASP.NET Core API which uses Dapper to .NET Core 3.1. After the migration, we felt there was an opportunity to use the latest IAsyncEnumerable feature from C# 8 for one of our endpoints.

Here is the pseudocode before the changes:

public async Task<IEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
       return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
        foreach (var item in items)
        {
            yield return item;
        }
    }     
}

After IAsyncEnumerable code changes:

// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
        return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
       await foreach (var item in items.ToAsyncEnumerable())
       {
           yield return item;
       }
    }
 }

The above approach is to use ToAsyncEnumerable is loosely inspired from this post, but I'm not 100% sure if I'm using it in the right place/ context.

Question:

  • The dapper library only returns IEnumerable but can we use ToAsyncEnumerable to convert it into IAsyncEnumerable for async stream like above?

Note: This question looks similar to What happens with returning IEnumerable if used with async/await (streaming data from SQL Server with Dapper)? but I do not think that answers my question.

Daniel A. White
  • 187,200
  • 47
  • 362
  • 445
Ankit Vijay
  • 3,752
  • 4
  • 30
  • 53
  • 4
    If dapper doesn't expose `IAsyncEnumerable` APIs, what do you expect to gain by wrapping an `IEnumerable` API? – Paulo Morgado Jan 29 '20 at 00:29
  • Hi @PauloMorgado, the operation that calls `GetItems` does some further manipulation on the returned `items`. It performs custom sort operation, translate it into a different DTO. In addition to this, my question was can we get some value by using `ToAsyncEnumerable ` to return `IAsyncEnumerable` in this case. Or is it of no use. – Ankit Vijay Jan 29 '20 at 02:10
  • 1
    That's all CPU work. Asynchronous will only make it worst, not better. – Paulo Morgado Jan 29 '20 at 07:21
  • 4
    You gain *nothing* by wrapping `IAsyncEnumerable` like this. `IAsyncEnumerable` allows you to return values as they arrive. What your code does though, is retrieve everything and then return it with a fake async operation. Clients think they're getting results fast, when in reality they have to wait the same way they did before – Panagiotis Kanavos Jan 29 '20 at 08:21
  • HI @PanagiotisKanavos and @Paulo, that is exactly what I was looking for. I was not 100% convinced if I was solving it right. Your comments have validated my concern. We will wait for Dapper to support `IAsyncEnumerable` before we jump to make this change on our side. If you put your comment as an answer, I'm happy to "Mark it as Answer" – Ankit Vijay Jan 29 '20 at 20:33
  • 4
    Hi @abatishchev, thanks for saying this. It has happen so many times that I have stopped asking why I was down voted. I would spend 15 mins to half an hour in framing the question and sometimes I have been down voted within mins. Guess, the definition of down vote is not very clear. For me, if a question is well formed and precise probably does not deserve a down vote if not up vote. – Ankit Vijay Feb 09 '20 at 19:50
  • 3
    @AnkitVijay: Cheers! By some reason by comment was deleted (by a moderator?). Screw this. – abatishchev Feb 11 '20 at 20:29
  • Sad state of affairs – Ankit Vijay Feb 25 '20 at 13:02
  • Some of the downvotes you're receiving may be because your question should have involved more research. A more indepth understanding would have made this question unnecessary, and it is now a top search result for IAsyncEnumerable Dapper. – svw Apr 27 '20 at 19:16
  • 8
    Hi @svw, if this question was the top of your search result, it just shows there is not much information available on the web on this topic. How do you expect someone to do more research in such a scenario? I think it is wrong to assume that no research was done before the question was posted on SO. I'm sure you will agree it is quite an effort to frame a question to receive a good response from the community. Anyways, downvotes don't really bother me anymore. :) – Ankit Vijay Apr 28 '20 at 08:23

2 Answers2

17

Update: I wasn't aware of async iterators when I first wrote this answer. Thanks to Theodor Zoulias for pointing it out. In light of that, a much simpler approach is possible:

using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

// Consider using reader.NextResultAsync(). Follow github issue for details:

while (await reader.ReadAsync()) {
    yield return rowParser(reader);
}

Ref: https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322

Original Answer:

Here's an IAsyncEnumerable wrapper I wrote that may help those who want to stream unbuffered data using async/await and also want the power of Dapper's type mapping:

public class ReaderParser<T> : IAsyncEnumerable<T> {
    public ReaderParser(SqlDataReader reader) {
        Reader = reader;
    }
    private SqlDataReader Reader { get; }
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
        return new ReaderParserEnumerator<T>(Reader);
    }
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
    public ReaderParserEnumerator(SqlDataReader reader) {
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    }
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader { get; }
    private Func<IDataReader, T> RowParser { get; }
    public async ValueTask DisposeAsync() {
        await Reader.DisposeAsync();
    }
    public async ValueTask<bool> MoveNextAsync() {
        return await Reader.ReadAsync();
    }
}

Usage:

var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

And then, package System.Linq.Async adds basically all the nice IEnumerable extensions you know and love, e.g. in my usage:

var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
Ravi M Patel
  • 2,905
  • 2
  • 23
  • 32
Dave
  • 900
  • 6
  • 22
  • 2
    AFAICS the `cancellationToken` parameter of the `GetAsyncEnumerator` method is ignored. Also why would you implement the interfaces explicitly, when you could just write an [async iterator](https://stackoverflow.com/questions/59689529/return-iasyncenumerable-from-an-async-method)? – Theodor Zoulias Mar 20 '21 at 16:45
  • 2
    It wasn't meant to be a robust solution, really. Admittedly, I didn't know about async iterators. I'll try and make the change and edit my answer appropriately. – Dave Mar 21 '21 at 18:15
  • For those finding this answer in the future - it works, but bear in mind disposing the reader will fetch (and ignore on client-side) the entire thing from the DB, even if the enumerator is disposed beforehand. I haven't found any way with Dapper to circumvent this, unfortunately – Bogey Nov 08 '21 at 18:26
  • In the update that you provided, how are you getting the method `ExecuteReaderAsync` to return a `DbDataReader` to get access to the `ReadAsync` method. All I see is the standard `IDataReader` that has a `Read` method only. – Adam Jan 15 '22 at 20:28
  • @Adam you may have been using an outdated version of Dapper. The change happened here: https://github.com/DapperLib/Dapper/commit/d2fa200bf22a6d3aecc8bdb6552e9a501cfe0f59 – Dave Jul 06 '22 at 13:24
  • Also for those finding this answer in the future, be _careful_ holding an open connection like this to your primary SQL database. The above can be great for read-only/non-transactional scenarios, but can cause a lot of issues if you rely heavily on transactions or use this strategy on your primary DB for heavy data ops such as an ETL pipeline. For those scenarios it's best either to use a read replica or another strategy to chunk the data more deterministically. – Dave Jul 06 '22 at 13:26
  • I have added my own [answer](https://stackoverflow.com/a/74550894/1178314) to summarize it. Cleaning up your answer through an edit was looking as too much changes. @Dave, if you do so, please ping me through a comment on my answer for me to delete it. (If you want to keep your first answer, I think it is better to write two answers, rather than letting both in a single post.) – Frédéric Nov 23 '22 at 17:38
  • Where are you closing the connection ? Also are you saying this is good for quick reads and small datasets but not big ETL queries ? – causita Jan 29 '23 at 14:50
  • @causita the connection isn't being managed at the scope of this method, but an outer scope. For large datasets we were finding a lot of blocking in our regular transactional queries when inserting into, updating or deleting from the same tables. So we ended up only using this type of solution when querying read-only data (such as from a read-only replica). – Dave Jan 30 '23 at 16:50
  • Dapper now has native support of this feature, since its [2.0.138 release](https://github.com/DapperLib/Dapper/releases/tag/2.0.138). – Frédéric Jun 28 '23 at 08:33
7

This feature is directly available in Dapper since its 2.0.138 release. Use its QueryUnbufferedAsync DbConnection extension or the GridReader.ReadUnbufferedAsync method on batches.

For previous versions of Dapper:

Dave's answer gives all that is needed, but is not straightforward enough for me.

So, elaborated from it, here is an extension method for obtaining an IAsyncEnumerable with Dapper:

/// <summary>
/// Asynchronously enumerates the results of a query.
/// </summary>
/// <typeparam name="T">The type of result to return.</typeparam>
/// <param name="cnn">The connection to query on.</param>
/// <param name="sql">The SQL to execute for the query.</param>
/// <param name="param">The parameters to pass, if any.</param>
/// <param name="transaction">The transaction to use, if any.</param>
/// <returns>An asynchronous enumerator of the results.</returns>
/// <remarks>See <see href="https://stackoverflow.com/a/66723553/1178314"/> and
/// <see href="https://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322"/>.</remarks>
public static async IAsyncEnumerable<T> EnumerateAsync<T>(this DbConnection cnn, string sql, object param = null, IDbTransaction transaction = null)
{
    await using var reader = await cnn.ExecuteReaderAsync(sql, param, transaction).ConfigureAwait(false);
    var rowParser = reader.GetRowParser<T>();
    while (await reader.ReadAsync().ConfigureAwait(false))
    {
        yield return rowParser(reader);
    }
    while (await reader.NextResultAsync().ConfigureAwait(false)) { }
}

Note that you must use a DbConnection, not an IDbConnection. Otherwise, reader.ReadAsync will not be available.

Then in your code you can do:

var asyncEnumerable = connection.EnumerateAsync<YourDto>("select ...", yourParameters);
Frédéric
  • 9,364
  • 3
  • 62
  • 112
  • 1
    Beware if you have seen this answer before June the 28 of 2023: a using was missing, causing a reader leak which could cause crashes. Make sure you update any code inspired by this answer if you have not spotted and fix the previously missing using on the method first line. – Frédéric Jun 28 '23 at 08:21