35

I have an async predicate method like this:

private async Task<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

Say I have a collection of Uris:

var addresses = new[]
{
    new Uri("http://www.google.com/"),
    new Uri("http://www.stackoverflow.com/") //etc.
};

I want to filter addresses using MeetsCriteria. I want to do this asynchronously; I want multiple calls to the predicate to run asynchronously, and I want to then wait for all of them to complete and produce the filtered result set. Unfortunately, LINQ doesn't appear to support asynchronous predicates, so something like this doesn't work:

var filteredAddresses = addresses.Where(MeetsCriteria);

Is there a similarly convenient way to do this?

Sam
  • 40,644
  • 36
  • 176
  • 219
  • 2
    What do you expect to happen if this would besupported? Especially when iterating `filteredAddresses` which is when `MeetsCriteria` is actually called. – Daniel Hilgarth Feb 15 '13 at 07:50
  • @DanielHilgarth: Thanks; that's a good point. This doesn't really seem to fit in with LINQ. – Sam Feb 17 '13 at 22:27

5 Answers5

25

I think one of the reasons nothing like this is in the framework is that there is lots of possible variations and each choice will be the right one under certain circumstances:

  • Should the predicates execute in parallel, or in series?
    • If they execute in parallel, should they all execute at once, or should the degree of parallelism be limited?
    • If they execute in parallel, should the results be in the same order as the original collection, in the order of completion, or in undefined order?
      • If they should be returned in the order of completion, should there be some way to (asynchronously) get the results as they complete? (This would require the change of return type from Task<IEnumerable<T>> to something else.)

You said you want the predicates to execute in parallel. In that case, the simplest choice is to execute them all at once and return them in the order of completion:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = new ConcurrentQueue<T>();
    var tasks = source.Select(
        async x =>
        {
            if (await predicate(x))
                results.Enqueue(x);
        });
    await Task.WhenAll(tasks);
    return results;
}

You could then use it like this:

var filteredAddresses = await addresses.Where(MeetsCriteria);
svick
  • 236,525
  • 50
  • 385
  • 514
  • 1
    I'd use a different method name, so the different semantics(in particular the re-ordering) become clear. – CodesInChaos Feb 15 '13 at 13:02
  • @CodesInChaos Yeah, possibly, but I'm not sure what would be a good name. `AsyncParallelWhereOrderedByCompletion()` would describe what the method does, but it's a terrible name. – svick Feb 15 '13 at 13:08
  • Maybe a name like `ConcurrentlyFilterAsync` would be suitable. – Sam Feb 17 '13 at 22:30
16

Considering the newer versions of the framework and the adoption of the IAsyncEnumerable<T> interface, I'd not suggest any of the other highly custom answers here anymore as they are largely unnecessary.

Async versions of LINQ are available through the System.Linq.Async NuGet package.

This is the syntax for making async checks:

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(async x => await MeetsCriteria(x));

filteredAddresses will be of type IAsyncEnumerable<int>, which can be either:

  • materialized with ToListAsync, FirstAsync, etc
  • iterated with await foreach

To get the same effect as before and be allowed to call using a method group, you can change the return type of MeetsCriteria to ValueTask:

private async ValueTask<bool> MeetsCriteria(Uri address)
{
    //Do something involving awaiting an HTTP request.
}

...

var filteredAddresses = addresses
    .ToAsyncEnumerable()
    .WhereAwait(MeetsCriteria);

I wouldn't recommend using ValueTask only to save a few characters though, since it should be benchmarked and used for performance/memory reasons instead.

julealgon
  • 7,072
  • 3
  • 32
  • 77
11

I think this is simplier than the accepted answer without using any concurrentQueue.

public static async Task<IEnumerable<T>> Where<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    var results = await Task.WhenAll(source.Select(async x => (x, await predicate(x))));
    return results.Where(x => x.Item2).Select(x => x.Item1);
}
Uwe Keim
  • 39,551
  • 56
  • 175
  • 291
shtse8
  • 1,092
  • 12
  • 20
9

First approach: issue all requests up-front one after the other, then await until all requests come back, and then filter the result. (svick's code also did this, but here I'm doing it without the intermediate ConcurrentQueue).

// First approach: massive fan-out
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) });
var addressesAndCriteria = await Task.WhenAll(tasks);
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A);

Second approach: do the requests one after the other. This will take longer but it will make sure not to hammer the webservice with a huge onslaught of requests (assuming that MeetsCriteriaAsync goes out to a webservice...)

// Second approach: one by one
var filteredAddresses = new List<Uri>();
foreach (var a in filteredAddresses)
{
  if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a);
}

Third approach: as for the second, but using a hypothetical C#8 feature "asynchronous streams". C#8 isn't out yet, and asynchronous streams aren't designed yet, but we can dream! The IAsyncEnumerable type already exists in RX, and hopefully they'll add more combinators for it. The nice thing about IAsyncEnumerable is that we can start consuming the first few filteredAddresses as soon as they come, rather than waiting for everything to be filtered first.

// Third approach: ???
IEnumerable<Uri> addresses = {...};
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync);

Fourth approach: maybe we don't want to hammer the webservice with all requests all at once, but we're happy to issue more than one request at a time. Maybe we did experiments and found that "three at a time" was a happy medium. NOTE: this code assumes a single-threaded execution context such as in UI programming or ASP.NET. If it's being run in a multi-threaded execution context then it needs a ConcurrentQueue and ConcurrentList instead.

// Fourth approach: throttle to three-at-a-time requests
var addresses = new Queue<Uri>(...);
var filteredAddresses = new List<Uri>();
var worker1 = FilterAsync(addresses, filteredAddresses);
var worker2 = FilterAsync(addresses, filteredAddresses);
var worker3 = FilterAsync(addresses, filteredAddresses);
await Task.WhenAll(worker1, worker2, worker3);

async Task FilterAsync(Queue<Uri> q, List<Uri> r)
{
  while (q.Count > 0)
  {
    var item = q.Dequeue();
    if (await MeetsCriteriaAsync(item)) r.Add(item);
  }
}

There are ways do to the fourth approach using the TPL dataflow library too.

tm1
  • 1,180
  • 12
  • 28
Lucian Wischik
  • 2,160
  • 1
  • 20
  • 25
6

I would use below approach instead of using ConcurrentBag or ConcurrentQueue

public static async IAsyncEnumerable<T> WhereAsync<T>(this IEnumerable<T> source, Func<T, Task<bool>> predicate)
{
    foreach(var item in source)
    {
        if(await (predicate(item)))
        {
            yield return item;
        }
    }
}

For example

    var result =  numbers.WhereAsync(async x =>
                                               await IsEvenAsync(x));
    await foreach (var x in result)
    {
        Console.Write($"{x},");
    }
AfshinZavvar
  • 722
  • 1
  • 10
  • 21