10

I have a sequence of tasks, where each one depends on the output of the previous one. I'd like to represent this as a single Task object, whose result is the output of the end of the sequence. (If the tasks didn't depend on one another then I could do it in parallel and I would use TaskFactory.ContinueWhenAll.)

I'd like to be able to implement this method:

static Task<TState> AggregateAsync<T, TState>(
    IEnumerable<T> items,
    TState initial,
    Func<TState, T, Task<TState>> makeTask);

How can I efficiently run the tasks one after another in sequence? I'm using C# 4.0 so I can't use async/await to do it for me.

Edit: I could write AggregateAsync like this:

static Task<TState> AggregateAsync<T, TState>(IEnumerable<T> items, TState initial, Func<TState, T, Task<TState>> makeTask)
{
    var initialTask = Task.Factory.StartNew(() => initial);
    return items.Aggregate(
        initialTask,
        (prevTask, item) =>
            {
                prevTask.Wait(); // synchronous blocking here?
                return makeTask(prevTask.Result, item);
            });
}

But surely I'll get a batch of tasks, each of which blocks synchronously waiting for the one before it?

Tim Robinson
  • 53,480
  • 10
  • 121
  • 138
  • 1
    The [Microsoft.Bcl.Async](https://www.nuget.org/packages/Microsoft.Bcl.Async) nugget adds support for `async` / `await` in .Net 4.0 projects, if that's an option for you. – ChrisK Dec 06 '13 at 10:58
  • Would the `ContinueWith` method help? http://msdn.microsoft.com/en-us/library/system.threading.tasks.task.continuewith%28v=vs.110%29.aspx – Gusdor Dec 06 '13 at 11:00

2 Answers2

11

The easy way (using Microsoft.Bcl.Async):

static async Task<TState> AggregateAsync<T, TState>(
    this IEnumerable<T> items,
    TState initial,
    Func<TState, T, Task<TState>> makeTask)
{
  var state = initial;
  foreach (var item in items)
    state = await makeTask(state, item);
  return state;
}

The hard way:

static Task<TState> AggregateAsync<T, TState>(
    this IEnumerable<T> items,
    TState initial,
    Func<TState, T, Task<TState>> makeTask)
{
  var tcs = new TaskCompletionSource<TState>();
  tcs.SetResult(initial);
  Task<TState> ret = tcs.Task;
  foreach (var item in items)
  {
    var localItem = item;
    ret = ret.ContinueWith(t => makeTask(t.Result, localItem)).Unwrap();
  }
  return ret;
}

Note that error handling is more awkward with the "hard" way; an exception from the first item will be wrapped in an AggregateException by each successive item. The "easy" way does not wrap exceptions like this.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
  • 1
    Thanks Stephen - it looks like `TaskExtensions.Unwrap` is the magic method. And the [example on MSDN](http://msdn.microsoft.com/en-us/library/dd780917(v=vs.100).aspx). is similar to my situation. – Tim Robinson Dec 06 '13 at 13:03
  • Thanks for the `.Unwrap` "trick". Also, can't the first three lines of your "hard way" be replaced with `Task.FromResult(initial)`? Is there any difference? – dcastro Dec 06 '13 at 13:04
  • 1
    @dcastro: Yes, except that `FromResult` isn't available on .NET 4.0. But that is logically what it's doing. – Stephen Cleary Dec 06 '13 at 13:05
  • I'm confused by the requirement for `Unwrap`. Seems like silly design on MS' part. – Gusdor Dec 09 '13 at 10:46
  • @Gusdor: It makes sense to me; if you have a task returning a task of state, the correct type *is* `Task>`, which you can `Unwrap` into a plain `Task`. There are certain situations where the `Task>` is exactly what you want. The reason it's a bit awkward is because we're doing asynchronous programming in .NET 4.0 (when the `Task` type was primarily intended for parallel programming). – Stephen Cleary Dec 09 '13 at 14:03
  • @StephenCleary right, i get it now. Unwrap returns a `Task `(instead of a `Task>`) but still waits for the unwrapped `Task` to finish. That is handy! – Gusdor Dec 10 '13 at 08:09
4

You can use Task.ContinueWith. The task you see in the code below, represents the previous (completed) task, and you can fetch its result to execute the second task, and so on.

T item1 = default(T);
T item2 = default(T);
Task<TState> task1 = makeTask(initial, item1);

//create second task
task1.ContinueWith(task => makeTask(task.Result, item2).Result,
                     TaskContinuationOptions.OnlyOnRanToCompletion);

Edit

Sorry, I missed this part

I'd like to represent this as a single Task object, whose result is the output of the end of the sequence.

In order to do that, you just have to return a reference to the result of the last ContinueWith call.

Task<State> aggregate = task1.ContinueWith(
                                  task => makeTask(task.Result, item2).Result,
                                  TaskContinuationOptions.OnlyOnRanToCompletion);

var res = aggregate .Result; //wait synchronously for the result of the sequence
dcastro
  • 66,540
  • 21
  • 145
  • 155
  • 1
    Having done this in F# and JS I wanted a `ContinueWith` that can create a new task - that is, `Func, Task>`. But there's only `Func, TResult>`. – Tim Robinson Dec 06 '13 at 12:02
  • What do you mean? I may have misunderstood you.. I'm positive you can easily implement a method with the given signature using the sample I provided, and your own implementation of `makeTask`. – dcastro Dec 06 '13 at 12:04
  • Just chain all your tasks with `.ContinueWith` and the result of that will be a single Task instance you can wait for (for example using Task.Result as dcastro shows). The tasks will be done in order. That is - `task1.ContinueWith(...).ContinueWith(...).ContinueWith(...)` or something similar. – Luaan Dec 06 '13 at 12:42
  • How would your second example apply to an arbitrary sequence? If I `return secondTask;`, I get a `Task>`; if I `return thirdTask;` I get `Task>>`, and so on. I updated the question with code that I think achieves what I want, but with blocking. – Tim Robinson Dec 06 '13 at 12:43
  • @TimRobinson my bad, i've fixed my examples to return `Task` instead of `Task>`. In this code, there is no blocking. By the time you call `.Result` on the previous task, the task has already been ran to completion. – dcastro Dec 06 '13 at 12:52
  • @TimRobinson: How so? `task.ContinueWith (_ => task.Result).ContinueWith (_ => 12).ContinueWith (_ => 43)` returns a `Task`. – Luaan Dec 06 '13 at 12:54
  • @Luaan in my previous code, my lambda was returning `Task` which in turn would make ContinueWith return a `Task>`. I changed my lambdas to simply return `TState` instead. – dcastro Dec 06 '13 at 12:55
  • @Luann `firstTask.ContinueWith(_ => CreateSecondTask(firstTask.Result)).ContinueWith(secondTask => CreateThirdTask(secondTask.Result)` given `Task CreateSecondTask(int firstResult)` and `Task CreateThirdTask(int secondResult)`. – Tim Robinson Dec 06 '13 at 12:57
  • @dcastro I think this blocks synchronously, the same as `Wait` in the question: `makeTask(task.Result, item2).Result` – Tim Robinson Dec 06 '13 at 13:12
  • @TimRobinson Give it a try. The first `Result` in that expression does not block, because the result is already available. The second `Result` only blocks the thread on which the `ContinueWith` task is running, which doesn't really matter. – dcastro Dec 06 '13 at 13:20
  • @dcastro Go from a single async sequence in the example to 1000 of these async sequences, running in parallel. I can't afford to have 1000 threads blocking. – Tim Robinson Dec 06 '13 at 13:25
  • Can anyone post an example of using this pattern please? I am trying to understand it. Thanks. – Frank Silano May 20 '20 at 18:06