43

We use IEnumerables to return huge datasets from database:

public IEnumerable<Data> Read(...)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            yield return item;
        }
    }
}

Now we want to use async methods to do the same. However there is no IEnumerables for async, so we have to collect data into a list until the entire dataset is loaded:

public async Task<List<Data>> ReadAsync(...)
{
    var result = new List<Data>();
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(await reader.ReadAsync().ConfigureAwait(false))
        {
            // ...
            result.Add(item);
        }
    }
    return result;
}

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

i3arnon
  • 113,022
  • 33
  • 324
  • 344
user1224129
  • 2,759
  • 3
  • 27
  • 29
  • *This will consume a huge amount of resources on server...* In this context, what is the server execution environment? (e.g. ASP.NET, WCF service, etc.) What is the client execution environment? (web browser, rich client .NET app, etc.) – noseratio Jul 28 '14 at 07:54
  • Reactive Extensions includes Async Enumerable's, which you should find helpful. – cwharris Jul 29 '14 at 02:06
  • @ChristopherHarris, do you mean `IAsyncEnumerable` from Ineteractive Extensions (Ix)? There's very little info available on it, like [these slides](http://qconlondon.com/dl/qcon-london-2011/slides/BartDeSmet_LINQTakeTwoRealizingTheLINQToEverythingDream.pdf) and [this blog](http://davesexton.com/blog/post/async-iterators.aspx). Was there any release beyond Ix Experimental? – noseratio Jul 29 '14 at 07:48

5 Answers5

29

The easiest option is using TPL Dataflow. All you need to do is configure an ActionBlock that handles the processing (in parallel if you wish) and "sends" the items into it one by one asynchronously.
I would also suggest setting a BoundedCapacity which will throttle the reader reading from the database when the processing can't handle the speed.

var block = new ActionBlock<Data>(
    data => ProcessDataAsync(data),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 1000,
        MaxDegreeOfParallelism = Environment.ProcessorCount
    });

using(var connection = new SqlConnection(...))
{
    // ...
    while(await reader.ReadAsync().ConfigureAwait(false))
    {
        // ...
       await block.SendAsync(item);
    }
}

You can also use Reactive Extensions, but that's a more complicated and robust framework than you probably need.

i3arnon
  • 113,022
  • 33
  • 324
  • 344
  • 1
    Good answer other solutions, including ones I coded myself, have been much more complicated for no reason. +1 – Mark Alicz Jul 26 '14 at 00:34
  • 3
    Dataflow for the win. It's really surprising how versatile it is. I just wrote a radio server that really benefits from the bounding of buffers that it provides. Love it. +1 – spender Jul 26 '14 at 00:49
  • 1
    @l3arnon, could you clarify how it helps to address this: *This will consume a huge amount of resources on server, because all data must be in the list before return*. Whether or not you use TPL Dataflow on the server to throttle data reads asynchronously, the response will be sent to the client when all data has been fetched. Unless you use something like SignalR or WCF streaming. – noseratio Jul 26 '14 at 01:00
  • 3
    @Noseratio The OP hasn't mentioned a client and didn't address that in code. She/He asked about database retrieval and processing. A solution for a client server architecture depends on the technology used for the client and the required behavior. – i3arnon Jul 26 '14 at 01:07
  • 1
    where is `item` add more code please and how does `BoundedCapacity = 1000` come into play here.. struggling to follow, what is `ProcessDataAsync` that its type has `SendAsync` or is `SendAsync` how a ActionBlock is called., very confusing could you add comments and make a little easier to follow. for those who are new to `TPL Dataflow` – Seabizkit Jan 02 '20 at 12:49
11

Most of the time when dealing with async/await methods, I find it easier to turn the problem around, and use functions (Func<...>) or actions (Action<...>) instead of ad-hoc code, especially with IEnumerable and yield.

In other words, when I think "async", I try to forget the old concept of function "return value" that is otherwise so obvious and that we are so familiar with.

For example, if you change you initial sync code into this (processor is the code that will ultimately do what you do with one Data item):

public void Read(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // ...
        while(reader.Read())
        {
            // ...
            processor(item);
        }
    }
}

Then, the async version is quite simple to write:

public async Task ReadAsync(..., Action<Data> processor)
{
    using(var connection = new SqlConnection(...))
    {
        // note you can use connection.OpenAsync()
        // and command.ExecuteReaderAsync() here
        while(await reader.ReadAsync())
        {
            // ...
            processor(item);
        }
    }
}

If you can change your code this way, you don't need any extension or extra library or IAsyncEnumerable stuff.

Simon Mourier
  • 132,049
  • 21
  • 248
  • 298
  • I would maybe add an option for an asyncProcessor: `public async void ReadAsync(..., Func asyncProcessor)` and use it: `await asyncProcessor(item);` – i3arnon Jul 28 '14 at 21:23
  • +1, not sure if I'll use this solution but it looks like thinking outside the box and gives me ideas for solutions to other problems – Fat Shogun Feb 15 '15 at 13:49
9

This will consume a huge amount of resources on server, because all data must be in the list before return. What is the best and easy to use async alternative for IEnumerables to work with large data streams? I would like to avoid storing all the data in memory while processing.

If you don't want to send all data to the client at once, you may consider using Reactive Extensions (Rx) (on the client) and SignalR (on both client and server) to handle this.

SignalR would allow to send data to the client asynchronously. Rx would allow to apply LINQ to the asynchronous sequence of data items as they're arriving on the client. This would however change the whole code model of you client-server application.

Example (a blog post by Samuel Jack):

Related question (if not a duplicate):

Community
  • 1
  • 1
noseratio
  • 59,932
  • 34
  • 208
  • 486
  • 1
    Certainly no downvote from me (I think the linked answer is great as it's the only solution which does not introduce external dependencies), but you seem to have answered *more* questions than is actually being asked. I can see how the word "server" might have tripped you up, but ultimately this whole thing seems to boil down to "how do I implement an asynchronous `yield return`" (which is fully addressed by your linked answer) as opposed to "how do I implement full-on client-server streaming". – Kirill Shlenskiy Jul 28 '14 at 08:25
  • @KirillShlenskiy, regardless of the client/server aspect, *asynchronous `yield return`* is exactly the problem Rx solves perfectly, IMO. `IObservable` is deeply dual to `IEnumerable` in almost every single way. – noseratio Jul 28 '14 at 08:25
  • 1
    ... actually, on second thought, there is one key aspect that might be important in the context of the current question which Rx does not address (due to the lack of `OnNextAsync` and consumer -> producer feedback mechanisms): back pressure. In cases where the consumer is slower than the producer, having a handover point with a bounded capacity can be invaluable to prevent the build-up of items in the intermediate cache ("*consuming a huge amount of resources*" as user1224129 put it), which would make TPL Dataflow, or your own linked solution, a better fit. – Kirill Shlenskiy Jul 28 '14 at 10:36
  • @KirillShlenskiy, Rx also has buffering for this (`Observable.Buffer`), and you can skip superfluous items, too. The main point is, with Dataflow you have to build a finite sequence before you can process it with a LINQ query. With Rx, you don't. Related: http://stackoverflow.com/a/24245474/1768303 – noseratio Jul 28 '14 at 11:07
  • @KirillShlenskiy The OP can easily exchange most (if not all) LINQ queries with an equivalent Dataflow block, if she/he has any interest in doing so. – i3arnon Jul 28 '14 at 21:08
7

As some of the other posters have mentioned this can be implemented with Rx. With Rx the function will return an IObservable<Data> which can be subscribed to and it pushes data to the subscriber as it becomes available. IObservable also supports LINQ and adds some extension methods of its own.

Update

I added a couple of generic helper methods to make the usage of the reader reusable as well as support for cancellation.

public static class ObservableEx
    {
        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc)
        {
            return CreateFromSqlCommand(connectionString, command, readDataFunc, CancellationToken.None);
        }

        public static IObservable<T> CreateFromSqlCommand<T>(string connectionString, string command, Func<SqlDataReader, Task<T>> readDataFunc, CancellationToken cancellationToken)
        {
            return Observable.Create<T>(
                async o =>
                {
                    SqlDataReader reader = null;

                    try
                    {                        
                        using (var conn = new SqlConnection(connectionString))
                        using (var cmd = new SqlCommand(command, conn))
                        {
                            await conn.OpenAsync(cancellationToken);
                            reader = await cmd.ExecuteReaderAsync(CommandBehavior.CloseConnection, cancellationToken);

                            while (await reader.ReadAsync(cancellationToken))
                            {
                                var data = await readDataFunc(reader);
                                o.OnNext(data);                                
                            }

                            o.OnCompleted();
                        }
                    }
                    catch (Exception ex)
                    {
                        o.OnError(ex);
                    }

                    return reader;
                });
        }
    }

The implementation of ReadData is now greatly simplified.

     private static IObservable<Data> ReadData()
    {
        return ObservableEx.CreateFromSqlCommand(connectionString, "select * from Data", async r =>
        {
            return await Task.FromResult(new Data()); // sample code to read from reader.
        });
    }

Usage

You can subscribe to the Observable by giving it an IObserver but there is also overloads that take lambdas. As data becomes available the OnNext callback gets called. If there is an exception, the OnError callback gets called. Finally, if there is no more data the OnCompleted callback gets called.

If you want to cancel the observable, simply dispose of the subscription.

void Main()
{
   // This is an asyncrhonous call, it returns straight away
    var subscription = ReadData()
        .Skip(5)                        // Skip first 5 entries, supports LINQ               
        .Delay(TimeSpan.FromSeconds(1)) // Rx operator to delay sequence 1 second
        .Subscribe(x =>
    {
        // Callback when a new Data is read
        // do something with x of type Data
    },
    e =>
    {
        // Optional callback for when an error occurs
    },
    () =>
    {
        //Optional callback for when the sequenc is complete
    }
    );

    // Dispose subscription when finished
    subscription.Dispose();

    Console.ReadKey();
}
NeddySpaghetti
  • 13,187
  • 5
  • 32
  • 61
2

I think Rx is definitely the way to go in this scenario, given an observable sequence is the formal dual to an enumerable one.

As mentioned in a previous answer you could re-write your sequence as an observable from scratch, but there are also a couple of ways to keep writing your iterator blocks but then just unwind them asynchronously.

1) Just convert the enumerable to an observable like so:

using System.Reactive.Linq;
using System.Reactive.Concurrency;

var enumerable = Enumerable.Range(10);
var observable = enumerable.ToObservable();
var subscription = observable.Subscribe(x => Console.WriteLine(x));

This will make your enumerable behave like an observable by pushing its notifications into any downstream observers. In this case, when Subscribe is called, it will synchronously block until all data has been processed. If you want it to be fully asynchronous, you can set it to a different thread, by using:

var observable = enumerable.ToObservable().SubscribeOn(NewThreadScheduler.Default);

Now the unwinding of the enumerable will be done in a new thread and the subscribe method will return immediately.

2) Unwind the enumerable using another asynchronous event source:

var enumerable = Enumerable.Range(10);
var observable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                           .Zip(enumerable, (t, x) => x);
var subscription = observable.Subscribe(x => Console.WriteLine(x));

In this case, I've setup a timer to fire every second and whenever it fires it moves the iterator forward. Now the timer could be easily replaced by any event source to control exactly when the iterator moves forward.

I find myself enjoying the syntax and semantics of iterator blocks (e.g. what happens with try/finally blocks and dispose), so I use these designs occasionally even when designing asynchronous operations.

glopes
  • 4,038
  • 3
  • 26
  • 29