23

The below method doesn't compile. Alternatives?

public static async Task<IEnumerable<object[]>> GetRecordsAsync(
    this Transaction transaction,
    string commandText,
    params SqlParameter[] parameters)
{
    // Get a SqlDataReader
    var reader = await transaction.GetReaderAsync(commandText, parameters);
    var fieldCount = -1;
    // Begin iterating through records asynchronously
    while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean
    {
        // Grab all the field values out
        if (fieldCount < 0)
            fieldCount = reader.FieldCount;
        var fields = new object[fieldCount];
        reader.GetValues(fields);
        // Yield return the field values from this record
        yield return fields;
    }
}

Error message:

The body of 'TransactionExtensions.GetRecordsAsync(Transaction, string, params SqlParameter[])' cannot be an iterator block because 'Task>' is not an iterator interface type

I don't see a way to adapt this answer to a similar sounding (but different) question, because I don't know a priori how many times the loop will go.

Edit: fixed formatting

Community
  • 1
  • 1
Matt Thomas
  • 5,279
  • 4
  • 27
  • 59
  • 6
    `IEnumerable` itself doesn't support that. Use Reactive Extensions. – SLaks Mar 19 '17 at 01:58
  • 1
    You can use *ObservableCollection* to monitor elements being added. Create and pass it to *GetRecordsAsync*, which shall now only return *Task*, and add to it once you are ready to yield `fields`. Now that I think about it, simply passing a "on fields received" delegate to the method is also possible. – IS4 Mar 19 '17 at 02:02
  • @IllidanS4 I think that would boil down to the comment given by SLaks. Both good ideas, but Reactive Extensive brings a lot of other goodies to bear – Matt Thomas Mar 19 '17 at 14:56
  • 2
    @MattThomas, also check via [Using async / await with DataReader ? ( without middle buffers!)](http://stackoverflow.com/questions/23854102/using-async-await-with-datareader-without-middle-buffers) for some alternative ideas. – noseratio Mar 20 '17 at 04:29
  • 1
    @Noseratio thanks for the link. The best option I took from that was Rx. To me the answer felt like something that Rx (or more generally, pub-sub) does – Matt Thomas Mar 20 '17 at 11:52
  • @SLaks if you post that as an answer then I'll accept it. Thanks! – Matt Thomas Mar 20 '17 at 11:55

3 Answers3

12

Based on @SLaks's comment to the question, here's a general alternative using Reactive Extensions:

/// <summary>
/// Turns the given asynchronous functions into an IObservable
/// </summary>
static IObservable<T> ToObservable<T>(
    Func<Task<bool>> shouldLoopAsync,
    Func<Task<T>> getAsync)
{
    return Observable.Create<T>(
        observer => Task.Run(async () =>
            {
                while (await shouldLoopAsync())
                {
                    var value = await getAsync();
                    observer.OnNext(value);
                }
                observer.OnCompleted();
            }
        )
    );
}

Example usage, tailored to solve the question's specific case:

/// <summary>
/// Asynchronously processes each record of the given reader using the given handler
/// </summary>
static async Task ProcessResultsAsync(this SqlDataReader reader, Action<object[]> fieldsHandler)
{
    // Set up async functions for the reader
    var shouldLoopAsync = (Func<Task<bool>>)reader.ReadAsync;
    var getAsync = new Func<SqlDataReader, Func<Task<object[]>>>(_reader =>
    {
        var fieldCount = -1;
        return () => Task.Run(() =>
        {
            Interlocked.CompareExchange(ref fieldCount, _reader.FieldCount, -1);
            var fields = new object[fieldCount];
            _reader.GetValues(fields);
            return fields;
        });
    })(reader);

    // Turn the async functions into an IObservable
    var observable = ToObservable(shouldLoopAsync, getAsync);

    // Process the fields as they become available
    var finished = new ManualResetEventSlim(); // This will be our signal for when the observable completes
    using (observable.Subscribe(
        onNext: fieldsHandler, // Invoke the handler for each set of fields
        onCompleted: finished.Set // Set the gate when the observable completes
    )) // Don't forget best practice of disposing IDisposables
        // Asynchronously wait for the gate to be set
        await Task.Run((Action)finished.Wait);
}

(Note that getAsync could be simplified in the above code block, but I like how explicit it is about the closure that's being created)

...and finally:

// Get a SqlDataReader
var reader = await transaction.GetReaderAsync(commandText, parameters);
// Do something with the records
await reader.ProcessResultsAsync(fields => { /* Code here to process each record */ });
Matt Thomas
  • 5,279
  • 4
  • 27
  • 59
10

Don't return a Task<IEnumerable<T>> and don't even use Task at all for this; instead, return an IAsyncEnumerable<T>. No need for third-party libraries or other workarounds, no need to even alter the body of your original method.

public static async IAsyncEnumerable<object[]> GetRecordsAsync(
    this Transaction transaction,
    string commandText,
    params SqlParameter[] parameters)
{
    // Get a SqlDataReader
    var reader = await transaction.GetReaderAsync(commandText, parameters);
    var fieldCount = -1;
    // Begin iterating through records asynchronously
    while (await reader.ReadAsync()) // Note we don't loop until .ReadAsync returns a boolean
    {
        // Grab all the field values out
        if (fieldCount < 0)
            fieldCount = reader.FieldCount;
        var fields = new object[fieldCount];
        reader.GetValues(fields);
        // Yield return the field values from this record
        yield return fields;
    }
}
Mark
  • 1,258
  • 13
  • 25
  • For those not using Core 3.0 or later, this is available via the Microsoft.Bcl.AsyncInterfaces NuGet package. You also need C# 8 or later, so those of us still mucking around in Framework have some manual project editing to do. – KeithS Mar 30 '23 at 20:17
0

I solved it without third-party extensions:

public async Task<IEnumerable<Item>> GetAllFromDb()
{
    OracleConnection connection = null;
    DbDataReader reader = null;
    try
    {
        connection = new OracleConnection(connectionString);
        var command = new OracleCommand(queryString, connection);
        connection.Open();

        reader = await command.ExecuteReaderAsync();

        return this.BuildEnumerable(connection, reader);
    }
    catch (Exception)
    {
        reader?.Dispose();
        connection?.Dispose();          
        throw;
    }
}

private IEnumerable<Item> BuildEnumerable(OracleConnection connection, DbDataReader reader)
{
    using (connection)
    using (reader)
    {
        while (reader.Read())
        {
            var item = new Item()
            {
                Prop = reader.GetString(0),
            };
            yield return item;
        }
    }
}

This example is for Oracle Data Reader but the same approach is applicable to any asynchronous operation combined with yield return

VeganHunter
  • 5,584
  • 2
  • 26
  • 26
  • 3
    But at the heart of this you call reader.Read(), the blocking method. It may compile and run but I don't think you are getting the real async benefits. – H H Apr 20 '18 at 05:49
  • Just wanted to point that we do have asynchronous operation: reader = await command.ExecuteReaderAsync(); so if implemented properly this operation will unblock the thread. Regarding Read operation - I agree with @Henk-Holterman, it is synchronous. But it is best trade off possible if you want to use standard C#, Linq, foreach, etc. – VeganHunter Apr 20 '18 at 07:13