22

My goal is simple , I want to do Asynchronous I/O calls (using async await) - but :

Ok.

Currently here is my code which it's job is to read from db and project each line to a Func<>

public IEnumerable < T > GetSomeData < T > (string sql, Func < IDataRecord, T > projector)
{
    using(SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using(SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            _conn.Open();
            _cmd.CommandTimeout = 100000;
            using(IDataReader rdr = _cmd.ExecuteReader())
            {
                while (rdr.Read())  yield    return projector(rdr);
            }
        }
    }
}

So , what is projector ?

Each class has a function which gets a record ( IDataRecord) and create an entity :

Example :

public class MyClass
{
    public static MyClass MyClassFactory(IDataRecord record)
    {
        return new MyClass
        {
            Name = record["Name"].ToString(),
            Datee = DateTime.Parse(record["Datee"].ToString()),
            val = decimal.Parse(record["val"].ToString())
        };
    }
    public string Name    {   get;   set;  }
    public DateTime Datee    {  get;     set;  }
    public decimal val    {  get;    set;    }
}

So here , MyClassFactory would be the Func

So how I currently run it ?

 var sql = @"SELECT TOP 1000 [NAME],[datee] ,[val]  FROM [WebERP].[dbo].[t]";
 var a = GetSomeData < MyClass > (sql, MyClass.MyClassFactory).Where(...); //notice the Func

All ok.

The problems starts now :

Adding async to the method yields an error : ( Yes I know that Ienumerable is a Synchronous interface hence the problem)

public async Task<IEnumerable < T >> GetSomeData < T > (string sql, Func < IDataRecord, T > projector)

cannot be an iterator block because 'System.Threading.Tasks.Task>' is not an iterator interface type

But this guy here did - :

enter image description here

Which DOES compile.

Question

How can I convert my code to support fully asynchronse IO call ?

(under the conditions : without DataFlow dependency , send projector function as argument , no middle buffers)

Community
  • 1
  • 1
Royi Namir
  • 144,742
  • 138
  • 468
  • 792
  • For such cases it would be nice if C# supported `IAsyncEnumerator`. If you are OK with returning an eagerly filled list in a fully async way the question becomes much easier. – usr May 25 '14 at 10:05
  • 5
    `ToArray` does create a middle buffer. – i3arnon May 25 '14 at 10:08
  • Your code iterates. The code that compiles does not. IEnumerable is not the problem. Iterating (ie: `yield return`) is the problem. What's the problem with a buffer? – J... May 25 '14 at 10:09
  • How about http://asyncenum.codeplex.com – Yuval Itzchakov May 25 '14 at 10:22
  • * (please don't make me switch to NODEJS) :-) hahaha – Royi Namir May 25 '14 at 10:37
  • @I3arnon Yes but as you can see , it's not my code :-) . indeed ToArray is a buffer. I'd hate to put the result in a buffer , and then iterate over it again. isn't it possible to iterate it via one time ? – Royi Namir May 25 '14 at 10:41
  • @usr is there any possible solution of which I don't have to create middle buffer ? I mean , Does the only solution is to create middle buffer ? – Royi Namir May 25 '14 at 10:47
  • 5
    @RoyiNamir you'd have to expose to the caller per-element asynchronicity. IEnumerable cannot do that. Taking an element is always synchronous. You need to use an async model like IAsyncEnumerator. There seem to be reasonable libraries around that idea (https://asyncenum.codeplex.com/). Nothing built-in. Also, the performance hit taken for buffering is so little compared to all the ADO.NET and SQL work that avoiding buffering does not have any meaningful effect of throughput. Would still make sense for streaming huge data sets. – usr May 25 '14 at 11:29
  • @usr: I believe `Ix-Async` still has an async enumerator. – Stephen Cleary May 26 '14 at 14:50
  • Have you considered using Rx or TPL Dataflow? – svick May 26 '14 at 19:31
  • You can also do this with Rx - http://stackoverflow.com/a/24968048/1239433 – NeddySpaghetti Jul 26 '14 at 06:00

2 Answers2

15

I want to do Asynchronous I/O calls (using async await) - but :

  • Without using DataFlow dependency ( like in this answer)
  • Without middle buffers( not like this answer)
  • The Projector function should be sent as an argument. ( not like this answer)

You may want to check Stephen Toub's "Tasks, Monads, and LINQ" for some great ideas on how to process asynchronous data sequences.

It's not (yet) possible to combine yield and await, but I'm going to be a verbalist here: the quoted requirements didn't list IEnumerable and LINQ. So, here's a possible solution shaped as two coroutines (almost untested).

Data producer routine (corresponds to IEnumarable with yield):

public async Task GetSomeDataAsync<T>(
    string sql, Func<IDataRecord, T> projector, ProducerConsumerHub<T> hub)
{
    using (SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using (SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            await _conn.OpenAsync();
            _cmd.CommandTimeout = 100000;
            using (var rdr = await _cmd.ExecuteReaderAsync())
            {
                while (await rdr.ReadAsync())
                    await hub.ProduceAsync(projector(rdr));
            }
        }
    }
}

Data consumer routine (correspond to foreach or a LINQ expression):

public async Task ConsumeSomeDataAsync(string sql)
{
    var hub = new ProducerConsumerHub<IDataRecord>();
    var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub);

    while (true)
    {
        var nextItemTask = hub.ConsumeAsync();
        await Task.WhenAny(producerTask, nextItemTask);

        if (nextItemTask.IsCompleted)
        {
            // process the next data item
            Console.WriteLine(await nextItemTask);
        }

        if (producerTask.IsCompleted)
        {
            // process the end of sequence
            await producerTask;
            break;
        }
    }
}

Coroutine execution helper (can also be implemented as a pair of custom awaiters):

public class ProducerConsumerHub<T>
{
    TaskCompletionSource<Empty> _consumer = new TaskCompletionSource<Empty>();
    TaskCompletionSource<T> _producer = new TaskCompletionSource<T>();

    // TODO: make thread-safe
    public async Task ProduceAsync(T data)
    {
        _producer.SetResult(data);
        await _consumer.Task;
        _consumer = new TaskCompletionSource<Empty>();
    }

    public async Task<T> ConsumeAsync()
    {
        var data = await _producer.Task;
        _producer = new TaskCompletionSource<T>();
        _consumer.SetResult(Empty.Value);
        return data;
    }

    struct Empty { public static readonly Empty Value = default(Empty); }
}

This is just an idea. It might be an overkill for a simple task like this, and it could be improved in some areas (like thread-safety, race conditions and handling the end of the sequence without touching producerTask). Yet it illustrates how the asynchronous data retrieval and processing could possibly be decoupled.

noseratio
  • 59,932
  • 34
  • 208
  • 486
  • Tnx. I don't udnerstand how `GetSomeDataAsync` is used with `ConsumeSomeDataAsync` . Both take sql command ?? – Royi Namir May 26 '14 at 11:41
  • @RoyiNamir, `ConsumeSomeDataAsync` calls `GetSomeDataAsync` to start the sequence and simply passes it the `sql` string. – noseratio May 26 '14 at 11:47
  • Could I be able to do something like `ConsumeSomeDataAsync().Where(...)` ? ( currently not) – Royi Namir May 26 '14 at 11:50
  • @RoyiNamir, here is a simple fiddle showing the concept: https://dotnetfiddle.net/8gQVt2. No, you won't be able to use standard LINQ. – noseratio May 26 '14 at 11:57
  • Thank you . can I refer this as "async bursts of data" ? – Royi Namir May 26 '14 at 11:58
  • @RoyiNamir, I'm not sure about the terminology :) – noseratio May 26 '14 at 12:00
  • no middle buffers , pure async fetch and spit out data as soon it is available. – Royi Namir May 26 '14 at 12:02
  • 1
    @RoyiNamir, I guess you could say so. Figuratively speaking, this a special case of producer/consumer pattern, with the queue size of 1 element and without blocking any threads. TPL Dataflow allows for much more complex scenarios. Then, there's also Rx. – noseratio May 26 '14 at 12:05
  • should'nt `_conn.Open();` be `await _conn.OpenAsync();` ? – Royi Namir May 27 '14 at 06:18
  • @RoyiNamir, yes that's what it should be, fixed. We don't want any blocking code. I did that for `rdr.Read()` but overlooked `_conn.Open` when copied your code. Note, I didn't test the SQL stuff. I only tested [this](https://dotnetfiddle.net/8gQVt2). – noseratio May 27 '14 at 06:22
  • You need to pass in a cancellationtoken and check for it inside the while loop – Robert Slaney Jan 25 '17 at 02:50
1

This Medium article describes another solution, which is to use the Dasync/AsyncEnumerable library.

The library is open source, available on NuGet and GitHub, and provides a readable syntax to use now, for IAsyncEnumerable, until C# 8.0 comes out and provides its own implementation and language support in the form of async ... yield return and await foreach.

(I have no connection with the library; I came across it as a possible very useful solution to - what I think is! - the same problem as yours, on a project I'm developing.)

MikeBeaton
  • 3,314
  • 4
  • 36
  • 45