9

I would like to use .NET iterator with parallel Tasks/await?. Something like this:

IEnumerable<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    Parallel.ForEach(
        source,
        s=>
        {
            // Ordering is NOT important
            // items can be yielded as soon as they are done                
            yield return ExecuteOrDownloadSomething(s);
        }
}

Unfortunately .NET cannot natively handle this. Best answer so far by @svick - use AsParallel().

BONUS: Any simple async/await code that implements multiple publishers and a single subscriber? The subscriber would yield, and the pubs would process. (core libraries only)

Yuri Astrakhan
  • 8,808
  • 6
  • 63
  • 97

3 Answers3

11

This seems like a job for PLINQ:

return source.AsParallel().Select(s => ExecuteOrDownloadSomething(s));

This will execute the delegate in parallel using a limited number of threads, returning each result as soon as it completes.

If the ExecuteOrDownloadSomething() method is IO-bound (e.g. it actually downloads something) and you don't want to waste threads, then using async-await might make sense, but it would be more complicated.

If you want to fully take advantage of async, you shouldn't return IEnumerable, because it's synchronous (i.e. it blocks if no items are available). What you need is some sort of asynchronous collection, and you can use ISourceBlock (specifically, TransformBlock) from TPL Dataflow for that:

ISourceBlock<TDst> Foo<TSrc, TDest>(IEnumerable<TSrc> source)
{
    var block = new TransformBlock<TSrc, TDest>(
        async s => await ExecuteOrDownloadSomethingAsync(s),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    foreach (var item in source)
        block.Post(item);

    block.Complete();

    return block;
}

If the source is “slow” (i.e. you want to start processing the results from Foo() before iterating source is completed), you might want to move the foreach and Complete() call to a separate Task. Even better solution would be to make source into a ISourceBlock<TSrc> too.

svick
  • 236,525
  • 50
  • 385
  • 514
  • Thanks, but could you give an example of how this can be solved with async/await? Thx! – Yuri Astrakhan Feb 11 '13 at 13:30
  • @Yurik Could you explain why do you want that? – svick Feb 11 '13 at 13:32
  • Mostly because I feel it will help me understand the new await syntax for a problem that's not "async 101", but a real world scenario. – Yuri Astrakhan Feb 11 '13 at 14:36
  • marked as accepted, but mostly for the AsParallel(). I guess what I was really looking for is an implementation of multiple pubs + single sub using async/await, and using core libraries only. But thanks! – Yuri Astrakhan Feb 14 '13 at 02:17
  • @Yurik Why do you care whether a library is core or not? – svick Feb 14 '13 at 02:35
  • Because if it uses core libs, it helps with learning the new basic concepts/patterns, whereas if it's a less known 3rd party lib, it makes me dependent on the lib, but doesn't help with the core language. – Yuri Astrakhan Feb 14 '13 at 03:27
1

So it appears what you really want to do is to order a sequence of tasks based on when they complete. This is not terribly complex:

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
    var input = tasks.ToList();

    var output = input.Select(task => new TaskCompletionSource<T>());
    var collection = new BlockingCollection<TaskCompletionSource<T>>();
    foreach (var tcs in output)
        collection.Add(tcs);

    foreach (var task in input)
    {
        task.ContinueWith(t =>
        {
            var tcs = collection.Take();
            switch (task.Status)
            {
                case TaskStatus.Canceled:
                    tcs.TrySetCanceled();
                    break;
                case TaskStatus.Faulted:
                    tcs.TrySetException(task.Exception.InnerExceptions);
                    break;
                case TaskStatus.RanToCompletion:
                    tcs.TrySetResult(task.Result);
                    break;
            }
        }
        , CancellationToken.None
        , TaskContinuationOptions.ExecuteSynchronously
        , TaskScheduler.Default);
    }

    return output.Select(tcs => tcs.Task);
}

So here we create a TaskCompletionSource for each input task, then go through each of the tasks and set a continuation which grabs the next completion source from a BlockingCollection and sets it's result. The first task completed grabs the first tcs that was returned, the second task completed gets the second tcs that was returned, and so on.

Now your code becomes quite simple:

var tasks = collection.Select(item => LongRunningOperationThatReturnsTask(item))
    .Order();
foreach(var task in tasks)
{
    var result = task.Result;//or you could `await` each result
    //....
}
Servy
  • 202,030
  • 26
  • 332
  • 449
  • Thanks, but what I need is to get a stream of processed objects as yields from a method. What you offered is basically a rewrite of Parallel.ForEach(). – Yuri Astrakhan Feb 11 '13 at 22:50
  • @Yurik If you don't need to wait for all of the items to be done you can remove the `WhenAll`/`WaitAll`, but other than that I fail to see how `Select` doesn't do what you need in and of itself. You have a sequence of items, and you want to turn that into a sequence of tasks, one for each item. How does `Select(item=> LongRunningOperation(item))` not meet your needs as it returns a sequence of Tasks? – Servy Feb 12 '13 at 00:09
  • In that case the ordering of the items will be the same as original, which might be inefficient. I don't mind out of order yielding of items. – Yuri Astrakhan Feb 12 '13 at 01:13
0

In the asynchronous library made by the MS robotics team, they had concurrency primitives which allowed for using an iterator to yield asynchronous code.

The library (CCR) is free (It didn't use to be free). A nice introductory article can be found here: Concurrent affairs

Perhaps you can use this library alongside .Net task library, or it'll inspire you to 'roll your own'

Toad
  • 15,593
  • 16
  • 82
  • 128
  • Could you explain how exactly would you use CCR here? – svick Feb 11 '13 at 10:12
  • The article I quoted can explain it better than I could. If you take a look at it and check the figure: 'Figure 6 SerialAsyncDemo' it has a code example which is almost exactly as what the OP asks: Async operations using a .Net iterator to yield. I do admit that I would think that this iterator syntax, although clever for it's time, is now mostly superseded by the async/await syntax – Toad Feb 11 '13 at 21:51