19

How should you use C# 5's async to represent a sequence of asynchronous tasks? For example, if we wanted to download numbered files from a server and return each one as we get it, how can we implement a method such as this?

public async IEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return await DownloadFile(string.Format(format, i));
    }
}
configurator
  • 40,828
  • 14
  • 81
  • 115
  • @BoltClock http://msdn.microsoft.com/en-us/library/hh156499(v=vs.110).aspx – configurator Oct 03 '11 at 19:09
  • 2
    Even though the framework is .NET 4.5, I believe the official version will be C# 5. Much like the features associated with LINQ were added in C# 3, but the framework was .NET 3.5. – Joshua Rodgers Oct 03 '11 at 19:19
  • @JoshuaRodgers: I completely agree, but the only numbers MS has published so far are VS 11 and .Net 4.5 so I'll stick to those :) – configurator Oct 03 '11 at 19:22
  • Do you want to start all the downloads at once, or you want to do something smarter (similar to `Parallel.For()`)? – svick Oct 03 '11 at 19:25
  • 1
    I've seen Eric Lippert himself refer to it as C# 5 on his blog (http://blogs.msdn.com/b/ericlippert/archive/2010/10/28/asynchrony-in-c-5-part-one.aspx). Although I haven't seen any officially published version numbers. Also this is potentially a duplicate of: http://stackoverflow.com/questions/5061761/c-5-is-it-possible-to-await-yield-return-dosomethingasync – Joshua Rodgers Oct 03 '11 at 19:26
  • 2
    Anders has also said publically that the next version of the language will be called version 5 but I hasten to add that this name has not been *officially* announced. If you write a book about "C# 5" and we decide before ship that it should be called something else, don't complain to me! – Eric Lippert Oct 03 '11 at 20:55
  • 7
    We had a *huge* debate about this question that lasted a long time. Every option has different pros and cons -- you could have a coarse-grained `Task>` or a fine-grained `IE>`, or both with something crazy like `Task>>`. The RX guys like `IAsyncEnumerator`, as Stephen points out in his answer. I am actually not sure what the WinFX team settled on as the standard pattern for their library. I'll look into it. – Eric Lippert Oct 03 '11 at 20:58
  • @EricLippert, I had wondered why there doesn't seem to be anything to support something like this. The fact that there is no solution clearly superior to the others explains that. But I would welcome some library types that make working with asynchronous collections easier. (Which could support the different options.) – svick Oct 03 '11 at 22:23

5 Answers5

5

A true sequence doesn't work well directly with async/await, because tasks only return a single value. You need an actual enumerable type, such as IAsyncEnumerator<T> in Ix-Async (or AsyncEx). The design of IAsyncEnumerator<T> is described in this Channel9 video.

Wiebe Tijsma
  • 10,173
  • 5
  • 52
  • 68
Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
5

It seems to me you want something very similar to BlockingCollection<T>, that uses Tasks and awaiting instead of blocking.

Specifically, something that you can add to without blocking or waiting. But when you try to remove an item when none is available at the moment, you can await until some item is available.

The public interface could look like this:

public class AsyncQueue<T>
{
    public bool IsCompleted { get; }

    public Task<T> DequeueAsync();

    public void Enqueue(T item);

    public void FinishAdding();
}

FinishAdding() is necessary, so that we know when to end dequeuing.

With this, your code could look like this (m_queue is AsyncQueue<File>):

var tasks = Enumerable.Range(0, 10)
    .Select(i => DownloadAndEnqueue(i))
    .ToArray();

Task.WhenAll(tasks).ContinueWith(t => m_queue.FinishAdding());

…

static async Task DownloadAndEnqueue(string url)
{
    m_queue.Enqueue(await DownloadFile(url));
}

It's not as nice as what you imagined could work, but it should work.

And the implementation of AsyncQueue<T>? There are two queues. One is for completed work, that hasn't been dequeued yet. The other is for Tasks (actually, TaskCompletionSource<T>) that were already dequeued, but that don't have any result yet.

When you dequeue and there is some completed work in the queue, just return work from there (using Task.FromResult()). If the queue is empty, create new Task, add it to the other queue and return it.

When you enqueue some completed work and there are some Tasks in the queue, remove one and finish it using the result we have now. If the Task queue is empty, add the work to the first queue.

With this, you can dequeue and enqueue as many times as you want, and it will work correctly. When you know there won't be any new work, call FinishAdding(). If there are any waiting Tasks, they will throw an exception.

In other words:

public class AsyncQueue<T>
{
    private readonly object m_lock = new object();

    private bool m_finishedAdding = false;

    private readonly Queue<T> m_overflowQueue = new Queue<T>();

    private readonly Queue<TaskCompletionSource<T>> m_underflowQueue =
        new Queue<TaskCompletionSource<T>>();

    public bool IsCompleted
    {
        get { return m_finishedAdding && m_overflowQueue.Count == 0; }
    }

    public Task<T> DequeueAsync()
    {
        Task<T> result;
        lock (m_lock)
        {
            if (m_overflowQueue.Count > 0)
                result = Task.FromResult(m_overflowQueue.Dequeue());
            else if (!m_finishedAdding)
            {
                var tcs = new TaskCompletionSource<T>();
                m_underflowQueue.Enqueue(tcs);
                result = tcs.Task;
            }
            else
                throw new InvalidOperationException();
        }
        return result;
    }

    public void Enqueue(T item)
    {
        lock (m_lock)
        {
            if (m_finishedAdding)
                throw new InvalidOperationException();

            if (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetResult(item);
            }
            else
                m_overflowQueue.Enqueue(item);
        }
    }

    public void FinishAdding()
    {
        lock (m_lock)
        {
            m_finishedAdding = true;

            while (m_underflowQueue.Count > 0)
            {
                var tcs = m_underflowQueue.Dequeue();
                tcs.SetException(new InvalidOperationException());
            }
        }
    }
}

If you wanted to limit size of the work queue (and thus limiting producers, if they are too fast), you could make Enqueue() return Task too, which would require another queue.

configurator
  • 40,828
  • 14
  • 81
  • 115
svick
  • 236,525
  • 50
  • 385
  • 514
  • Actually, awaiting task.WhenAll would defeat the purpose of this method because the AsyncQueue would only be returned after the tasks are finished. But this is pretty much exactly what I was looking for so thanks! – configurator Oct 03 '11 at 22:56
  • You're right and I didn't mean it like that. You should start that code on background and return the `AsyncQueue` immediately. The `WhenAll()` is there because of the necessary `FinishAdding()` that follows it. – svick Oct 03 '11 at 23:14
  • I've edited the `await` out and now I believe it's good. Agreed? – configurator Oct 04 '11 at 13:47
  • Yeah, this works too. And, yeah, it's probably better to do it this way. – svick Oct 04 '11 at 14:35
  • 2
    If somebody is reading this now: just use TPL Dataflow, it does exactly this (and much more). – svick Mar 05 '14 at 17:50
3

I know it's been a while, but I've written something to closely emulate "yield return" for async enumerables here. No complicated code needed.

You use it like:

public IAsyncEnumerable<File> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    return AsyncEnumerable.Create(async y =>
    {
        for (int i = 0; i++; ) {
            await y.YieldReturn(await DownloadFile(string.Format(format, i)));
        }
    };
}

I normally avoid advertising my own code here but this is a clearly needed feature in C# 6.0, so I hope you find it useful in C# 5.0 if you're still stuck on this.

Cory Nelson
  • 29,236
  • 5
  • 72
  • 110
-1

If you only had a finite number of URLs, you could do this:

    public async Task<IEnumerable<File>> DownloadPictures()
    {
        const string format = "http://example.com/files/{0}.png";
        var urls = Enumerable.Range(0, 999).Select(i => String.Format(format, i));
        var tasks = urls.Select(u => DownloadFile(u));
        var results = Task.WhenAll(tasks);
        return await results;
    }

The key is to get a list of Tasks and then call Task.WhenAll on that list.

Josh Mouch
  • 3,480
  • 1
  • 37
  • 34
  • But now you can't process even a single task until every single file has been downloaded, and it forces you to bring the entire collection into memory, instead of being able to stream the results, and process them as they come in, greatly improving both the memory footprint as well as the speed of the application. You even choose an example particularly poorly suited to this approach. If you had a very small result set, and all items tended to be computed at about the same time, those problems aren't noticeable. – Servy Jun 20 '14 at 21:09
  • I missed the "return each one as we get it" part of the question. This article came up for a google search I was performing, and the answer I gave was for that. – Josh Mouch Jun 20 '14 at 21:17
-1

The benefit of async is the calling method can invoke several blocking operations in parallel and only block once the returned value is needed. This same capability is possible in this scenario with yield/return by using return type IEnumerable<Task>.

public IEnumerable<Task<File>> DownloadPictures() {
    const string format = "http://example.com/files/{0}.png";
    for (int i = 0; i++; ) {
        yield return DownloadFileAsync(string.Format(format, i));
    }
}

In a similar fashion to async/await the calling method can now continue to execute until it needs the next value at which point await/.Result can be called on the next task. The following extension method demonstrates this:

    public static IEnumerable<T> Results<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks)
            yield return task.Result;
    }

If the calling method would like to ensure that all the IEnumerable Tasks are created and running in parallel then an extension method such as the following may be beneficial (this, and the aforementioned method, are likely already in a standard lib):

    public static IEnumerable<T> ResultsParallel<T>(this IEnumerable<Task<T>> tasks)
    {
        foreach (var task in tasks.ToArray())
            yield return task.Result;
    }

Notice how the responsibility for the concern of what runs in parallel is transferred to the calling method just as it was with async/await. In the event that there is concern regarding the creation of the Tasks blocking, an extension method such as the following could be created:

    public static Task<IEnumerable<T>> ResultsAsync<T>(this IEnumerable<Task<T>> tasks)
    {
        var startedTasks = new ConcurrentQueue<Task<T>>();
        var writerTask = new Task(() =>
            {
                foreach (var task in tasks)
                {
                    startedTasks.Enqueue(task);
                }
            });
        writerTask.Start();

        var readerTask = new Task<IEnumerable<T>>(() =>
        {
            return ResultsSequential(startedTasks, () => writerTask.IsCompleted);
        });
        readerTask.Start();
        return readerTask;
    }

    private static IEnumerable<T> ResultsSequential<T>(ConcurrentQueue<Task<T>> tasks, Func<bool> isDone)
    {
        while (true)
        {
            Task<T> task;
            if (isDone.Invoke())
            {
                if (tasks.TryDequeue(out task))
                {
                    yield return task.Result;
                }
                else
                {
                    yield break;
                }
            } else if (tasks.TryDequeue(out task))
            {
                yield return task.Result;
            }
        }
    }

This implementation is not very efficient. An efficient implementation is too large to fit in the margin.

Joshcodes
  • 8,513
  • 5
  • 40
  • 47
  • The implementation of your last method is a single method call. `return Task.WhenAll(tasks);` – Servy Nov 25 '14 at 16:11
  • Would WhenAll return the result of the first task before the last task is started? – Joshcodes Dec 02 '14 at 00:32
  • 1
    I just took a look at how you implemented it and realized that it's just the same as your serial implementation except that you wrap it in an already completed task. Nothing about it is asynchronous at all; it is entirely synchronous. This is of course a limitation of its signature; given its signature it's *impossible* for it to yield the results of each operation asynchronously. Your only options are to do everything synchronously, or only complete the task when you have all of the results. I assumed you did the latter, because it's really the only sensible implementation. – Servy Dec 02 '14 at 15:11
  • @Servy Note that, as stated above, "In the event that there is concern regarding the creation of the Tasks blocking" that the implementation does work differently. Here are some unit tests that demonstrate that: https://gist.github.com/joshcodes/77fa505468bf1135d611 Please reconsider your down-vote. – Joshcodes Dec 02 '14 at 19:42