6

Basically I have a procedure like

var results = await Task.WhenAll(
    from input in inputs
    select Task.Run(async () => await InnerMethodAsync(input))
);
.
.
.
private static async Task<Output> InnerMethodAsync(Input input)
{
    var x = await Foo(input);
    var y = await Bar(x);
    var z = await Baz(y);
    return z;
}

and I'm wondering whether there's a fancy way to combine this into a single LINQ query that's like an "async stream" (best way I can describe it).

johnny 5
  • 19,893
  • 50
  • 121
  • 195
user7127000
  • 3,143
  • 6
  • 24
  • 41

3 Answers3

9

When you use LINQ, there are generally two parts to it: creation and iteration.

Creation:

var query = list.Select( a => a.Name);

These calls are always synchronous. But this code doesn't do much more than create an object that exposes an IEnumerable. The actual work isn't done till later, due to a pattern called deferred execution.

Iteration:

var results = query.ToList();

This code takes the enumerable and gets the value of each item, which typically will involve the invocation of your callback delegates (in this case, a => a.Name ). This is the part that is potentially expensive, and could benefit from asychronousness, e.g. if your callback is something like async a => await httpClient.GetByteArrayAsync(a).

So it's the iteration part that we're interested in, if we want to make it async.

The issue here is that ToList() (and most of the other methods that force iteration, like Any() or Last()) are not asynchronous methods, so your callback delegate will be invoked synchronously, and you’ll end up with a list of tasks instead of the data you want.

We can get around that with a piece of code like this:

public static class ExtensionMethods
{
    static public async Task<List<T>> ToListAsync<T>(this IEnumerable<Task<T>> This)
    {
        var tasks = This.ToList();     //Force LINQ to iterate and create all the tasks. Tasks always start when created.
        var results = new List<T>();   //Create a list to hold the results (not the tasks)
        foreach (var item in tasks)
        {
            results.Add(await item);   //Await the result for each task and add to results list
        }
        return results;
    }
}

With this extension method, we can rewrite your code:

var results = await inputs.Select( async i => await InnerMethodAsync(i) ).ToListAsync();

^That should give you the async behavior you're looking for, and avoids creating thread pool tasks, as your example does.

Note: If you are using LINQ-to-entities, the expensive part (the data retrieval) isn't exposed to you. For LINQ-to-entities, you'd want to use the ToListAsync() that comes with the EF framework instead.

Try it out and see the timings in my demo on DotNetFiddle.

John Wu
  • 50,556
  • 8
  • 44
  • 80
  • 3
    Note that *ToListAsync* is for `IQueryable<>`, not `IEnumerable<>`. – xanatos Jun 01 '18 at 06:44
  • 2
    @xanatos Indeed. Also is not generally applicable because is from `EntityFramework.dll` – Ivan Stoev Jun 01 '18 at 06:54
  • Thanks. Based on this feedback I've rewritten the answer. – John Wu Jun 01 '18 at 19:21
  • `var tasks = This.ToList();` in your solution would be a problem, its same as calling `ToList` on actual IEnumerable generated via Linq, when ideally you shall enumerate the IEnumerable, `foreach (var item in This)`, no List conversion required – Mrinal Kamboj Aug 28 '18 at 09:10
  • @MrinalKamboj I find your feedback frustrating, since you state "would be a problem" without specifying the problem, and offer an alternative which is "ideal" but don't say why. I suggest for technical discussions you avoid judgmental words like those and force yourself to be specific. As it is your comment is unactionable. – John Wu Aug 28 '18 at 09:37
  • @JohnWu I provided you the details, In the extension method you are explicitly doing `ToList()` on `IEnumerable`, which is no different than calling calling `ToList` on the Linq query, like `list.Select(async a => await httpClient.GetByteArrayAsync(a)).ToList()`, since the purpose is conversion of `IEnumerable>` to `Task>`, which is feasible without `ToList` call on `IEnumerable`. As you are filling the data in separate `List` created locally. I hope this clarifies. – Mrinal Kamboj Aug 28 '18 at 10:34
  • @MrinalKamboj Sorry but still not clear....You are describing my solution and an alternative approach. You are not specifying why one is preferable to the other. For example, will the results be incorrect? Is there a performance issue? Do you believe one approach is more maintainable? Etc. Just so you understand, I view the ToList() call as indispensable since the tasks have to be iterated twice and I can't guarantee the caller didn't provide an IEnumerable that is mutable. – John Wu 57 mins ago – John Wu Aug 28 '18 at 17:08
  • I agree with you, it was mistake on my part, since if `IEnumerable>`, doesn't have a memory allocated, then `foreach` loop will sequentially execute each task, thus creating a performance overhead, as depicted by program [here](https://dotnetfiddle.net/xhkThp), to test the behavior by introducing a temporary delay. With explicit `ToList()`, they are executed in parallel. – Mrinal Kamboj Aug 29 '18 at 10:00
3

A rather obvious answer, but you have just used LINQ and async together - you're using LINQ's select to project, and start, a bunch of async Tasks, and then await on the results, which provides an asynchronous parallelism pattern.

Although you've likely just provided a sample, there are a couple of things to note in your code (I've switched to Lambda syntax, but the same principals apply)

  • Since there's basically zero CPU bound work on each Task before the first await (i.e. no work done before var x = await Foo(input);), there's no real reason to use Task.Run here.
  • And since there's no work to be done in the lambda after call to InnerMethodAsync, you don't need to wrap the InnerMethodAsync calls in an async lambda (but be wary of IDisposable)

i.e. You can just select the Task returned from InnerMethodAsync and await these with Task.WhenAll.

var tasks = inputs
    .Select(input => InnerMethodAsync(input)) // or just .Select(InnerMethodAsync);

var results = await Task.WhenAll(tasks);

More complex patterns are possible with asynchronony and Linq, but rather than reinventing the wheel, you should have a look at Reactive Extensions, and the TPL Data Flow Library, which have many building blocks for complex flows.

StuartLC
  • 104,537
  • 17
  • 209
  • 285
2

Try using Microsoft's Reactive Framework. Then you can do this:

IObservable<Output[]> query =
    from input in inputs.ToObservable()
    from x in Observable.FromAsync(() => Foo(input))
    from y in Observable.FromAsync(() => Bar(x))
    from z in Observable.FromAsync(() => Baz(y))
    select z;

Output[] results = await query.ToArray();

Simple.

Just NuGet "System.Reactive" and add using System.Reactive.Linq; to your code.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • `select z` shall result in `IObservable` type, does that need an explicit `await` or would it work otherwise too. Does `await` ensures data streaming. – Mrinal Kamboj Jun 02 '18 at 16:25
  • 1
    Yes, this gives an `IObservable`. I realized a small mistake and have corrected it - I should have written `await query.ToArray()` rather than `await query` as the latter only returns the last item from an observable. Using `.ToArray()` it returns all values when awaited. Awaiting an observable performs a `.Subscribe` under the hood. – Enigmativity Jun 03 '18 at 05:53