5

I've got two .net Task objects that I may wish to run in parellel or in sequence. In either case, I don't want to block a thread to wait for them. As it turns out, Reactive Extensions make the parallel story simply beautiful. But when I try to arrange the tasks in sequence, the code works but just feels awkward.

I'd like to know if anyone can show how to make the sequential version more concise or be coded as effortlessly as the parallel version. It is not necessary to use reactive extensions for the answer to this question.

For reference, here are my two solutions for both parallel and sequential processing.

Parallel Processing Version

This is pure joy:

    public Task<string> DoWorkInParallel()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
        Task<bool> BravoTask = Task.Factory.StartNew(() => true);

        //Prepare for Rx, and set filters to allow 'Zip' to terminate early
        //in some cases.
        IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
        IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

        Observable
            .Zip(
                AsyncAlpha,
                AsyncBravo,
                (x, y) => y.ToString() + x.ToString())
            .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
                (x) => { result.TrySetResult(x); },
                (x) => { result.TrySetException(x.GetBaseException()); },
                () => { result.TrySetResult("Nothing"); });

        return result.Task;
    }

Sequential/Pipeline Processing Version

This works but is just clumsy:

    public Task<string> DoWorkInSequence()
    {
        var result = new TaskCompletionSource<string>();

        Task<int> AlphaTask = Task.Factory.StartNew(() => 4);

        AlphaTask.ContinueWith(x =>
        {
            if (x.IsFaulted)
            {
                result.TrySetException(x.Exception.GetBaseException());
            }
            else
            {
                if (x.Result != 5)
                {
                    Task<bool> BravoTask = Task.Factory.StartNew(() => true);
                    BravoTask.ContinueWith(y =>
                    {
                        if (y.IsFaulted)
                        {
                            result.TrySetException(y.Exception.GetBaseException());
                        }
                        else
                        {
                            if (y.Result)
                            {
                                result.TrySetResult(x.Result.ToString() + y.Result.ToString());
                            }
                            else
                            {
                                result.TrySetResult("Nothing");
                            }
                        }
                    });
                }
                else
                {
                    result.TrySetResult("Nothing");
                }
            }
        }
        );

        return result.Task;
    }

In the above sequential code, it has become a mess and I havn't even added the timeout capability to match the parallel version!

Requirements (UPDATED on 8/6)

For those answering, please be mindful that:

  1. The sequential scenario should permit the arrangement where the output of the first task feeds the input of the second. My sample "awkward" code above could easily have been arranged to achieve that.

  2. I'm interested in a .net 4.5 answer - but a .net 4.0 answer is equally or more important for me.

  3. Tasks 'Alpha' and 'Bravo' have a combined time-limit of 200ms in which to complete; they do not have 200ms each. This is true in the sequential case as well.

  4. The SourceCompletionTask must complete early, before both tasks complete, if either task returns an invalid result. An invalid result is either [AlphaTask:5] or [BravoTask:false] as indicated by the explicit tests in the sample code.
    Update 8/8: Clarification - In the sequential case, the BravoTask should not execute at all if AlphaTask is not successful or if the timeout has already occurred.

  5. Assume both AlphaTask and BravoTask cannot block. Not that it matters, but in my real-world scenario they are actually async WCF service calls.

Maybe there is an aspect of Rx I could have exploited to clean up the sequential version. But even just Task programming by itself should have a better story I'd imagine. We'll see.

ERRATA In both code samples I changed the return type to Task, as poster answers were quite correct that I should not have been returning a TaskCompletionSource.

Brent Arias
  • 29,277
  • 40
  • 133
  • 234

4 Answers4

4

If you can use async/await, Brandon has a nice answer. If you are still on VS2010, the first thing I would do to clean up the sequential version is to get an extension method like the Then method Stephen Toub described in a blog post. I would also implement a Task.FromResult method if you are not using .NET 4.5. With those, you could get:

public Task<string> DoWorkInSequence()
{
    return Task.FromResult(4)
           .Then(x => 
                 { if (x != 5)
                   {
                       return Task.FromResult(true)
                              .Then(y => 
                                    { if (y)
                                      {
                                          return Task.FromResult(x.ToString() + y.ToString());
                                      }
                                      else
                                      {
                                          return Task.FromResult("Nothing");
                                      }
                                    });
                    }
                    else
                    {
                        return Task.FromResult("Nothing");
                    }
                 });
}

Also, you should generally return a Task instead of a TaskCompletionSource (which you can get by calling .Task on the TaskCompletionSource), since you don't want the caller to set a result on the task you are returning to them.

Brandon's answer also gives a good way to implement the timeout functionality (adjusting for the lack of async/await keywords).

EDIT To reduce arrow code, we can implement more of the LINQ methods. A SelectMany implementation is provided in the previously linked blog post. The other methods we will need for LINQ are Select and Where. These should be fairly straightforward once you've done Then and SelectMany, but here they are:

public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate)
{
    if (task == null) throw new ArgumentNullException("task");
    if (predicate == null) throw new ArgumentNullException("predicate");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
        {
            if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
            else if (completed.IsCanceled) tcs.TrySetCanceled();
            else
            {
                try
                {
                    if (predicate(completed.Result))
                        tcs.TrySetResult(completed.Result);
                    else
                        tcs.TrySetCanceled();
                }
                catch (Exception ex)
                {
                    tcs.TrySetException(ex);
                }
            }
        });
    return tcs.Task;
}

public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector)
{
    if (task == null) throw new ArgumentNullException("task");
    if (selector == null) throw new ArgumentNullException("selector");

    var tcs = new TaskCompletionSource<TResult>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetCanceled();
        else
        {
            try
            {
                tcs.TrySetResult(selector(completed.Result));
            }
            catch (Exception ex)
            {
                tcs.TrySetException(ex);
            }
        }
    });
    return tcs.Task;
}

After that, one final non-LINQ extension method allows use to return a default value when cancelled:

public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue)
{
    if (task == null) throw new ArgumentNullException("task");

    var tcs = new TaskCompletionSource<T>();
    task.ContinueWith((completed) =>
    {
        if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions);
        else if (completed.IsCanceled) tcs.TrySetResult(defaultValue);
        else tcs.TrySetResult(completed.Result);
    });
    return tcs.Task;
}

And the new and improved DoWork (sans timeout):

public static Task<string> DoWorkInSequence()
{
    return (from x in Task_FromResult(5)
            where x != 5
            from y in Task_FromResult(true)
            where y
            select x.ToString() + y.ToString()
           ).IfCanceled("Nothing");
}

The Timeout method from Brandon's answer (once rewritten, if needed without async/await) can be stuck on the end of the chain for an overall timeout and/or after each step in the chain if you want to keep further steps from running once the overall timeout is reached. Another possibility for the chain interruption would be to make all the individual steps take a cancellation token and modify the Timeout method to take the CancellationTokenSource and cancel it if a timeout occurs, as well as throwing the timeout exception.

EDIT (Brent Arias)

Taking fantastic ideas from what you have presented, I've devised what I think is the final answer from my POV. It is based on the .net 4.0 extension methods found in the nuget package of ParallelExtensionsExtras. The sample below adds a third task, to help illustrate the "feel" of programming for sequential tasks, given my stated requirements:

public Task<string> DoWorkInSequence()
{
    var cts = new CancellationTokenSource();

    Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); });

    Task<int> AlphaTask = Task.Factory
        .StartNew(() => 4 )
        .Where(x => x != 5 && !cts.IsCancellationRequested);

    Task<bool> BravoTask = AlphaTask
        .Then(x => true)
        .Where(x => x && !cts.IsCancellationRequested);

    Task<int> DeltaTask = BravoTask
        .Then(x => 7)
        .Where(x => x != 8);

    Task<string> final = Task.Factory
        .WhenAny(DeltaTask, timer)
        .ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion
            ? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing");

    //This is here just for experimentation.  Placing it at different points
    //above will have varying effects on what tasks were cancelled at a given point in time.
    cts.Cancel();

    return final;
}

There are a few key observations I've made in this discussion and joint effort:

  • Using the "Then" extension is nice in trivial cases, but has noteworthy limited applicability. For more complex cases it would be necessary to replace it with, for example, .ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default). When replacing "Then" with "ContinueWith" for my stated scenario, it is critical to add the OnlyOnRanToCompletion option.
  • Using a Timeout extension ultimately just doesn't work in my scenario. This is because it will only cause a cancellation of the Task it is immediately attached to, instead of cancelling all the antecedant Task instances in the sequence. This is why I switched to the StartNewDelayed(...) tactic and added an express cancellation check in each Where clause.
  • Although the ParallelExtensionsExtras library has the LINQ to Tasks defined that you used, I concluded it is best to stay away from LINQ-ish appearances with Tasks. This is because Tasks with LINQ are highly esoteric; it would likely confuse the hell out of the average developer. It is hard enough to get them to understand asynchronous coding. Even the author of LINQ to Tasks said "How useful this LINQ implementation is in practice is arguable, but at the very least it provides for an interesting thought exercise." Yes agreed, an interesting thought exercise. Of course I must admit at least the "Where" LINQ to Tasks method, as it played a key role in the solution I listed above.
Community
  • 1
  • 1
Gideon Engelberth
  • 6,095
  • 1
  • 21
  • 22
  • +1 for `Then`. It is essential to staying sane if you are writing continuations and cannot use async/await. – Brandon Aug 06 '13 at 10:23
  • The "Then" extension, as shown in that blog post, appears to have a bug. At best, it always has the `next` task `t` hint at synchronous execution, which may not be desireable - such as when the `first` task is an I/O thread. At worst, it does not do anything to ensure that the task `t` of `var t = next(first.Result)` is subsequently started. The 'next' task `t` may then not execute at all. But even aside from the alleged bug, the code isn't significantly more concise than what I started with. For example, it must handle the "Nothing" result twice while it forms "Arrow Code." No timeout too – Brent Arias Aug 06 '13 at 21:51
  • @BrentArias The synchronous hint is easy enough to remove. If I understand your other point correctly, you are noting that the task returned from `next` could not be started when it is returned. While that is possible, it would go against the norm for task-returning functions. To deal with the arrow code, we just need... more extension methods! (See update) – Gideon Engelberth Aug 07 '13 at 02:23
  • @Gideon: I'll let my edition hang for a bit, to see if you want to make edits. After that I'll presume to mark your post as the answer. – Brent Arias Aug 10 '13 at 03:19
  • @BrentArias That looks like what I would expect for timeout and cancellation. All the LINQ syntax would be doing for you in this case would be creating the intermediate variables (in the from clauses), so whichever is easier to understand for the people that have to work on the code is the one to go with. – Gideon Engelberth Aug 12 '13 at 03:01
1
public Task<string> DoWorkInSequence()
{
    Task<int> AlphaTask = Task.Factory.StartNew(() => 4);
    Func<int> BravoFunc = x => 2 * x;

    //Prepare for Rx, and set filters to allow 'Zip' to terminate early
    //in some cases.
    IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);

    return AsyncAlpha
        .Do(x => Console.WriteLine(x))  //This is how you "Do WORK in sequence"
        .Select(BravoFunc)              //This is how you map results from Alpha
                                        //via a second method.
        .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe(
            (x) => { result.TrySetResult(x); },
            (x) => { result.TrySetException(x.GetBaseException()); },
            () => { result.TrySetResult("Nothing"); }).ToTask();
}

Ultimately however I would actually just do it all in TPL if you want Tasks, or use Observable.ToTask(this IObservable<T> observable) as opposed to using TaskCompletionSource

Aron
  • 15,464
  • 3
  • 31
  • 64
  • Both `AlphaTask` and `BravoTask` need to asynchronous. In my real-world scenario they represent web service calls I don't want to block on. Can you adjust your sample to reflect that? – Brent Arias Aug 06 '13 at 21:51
  • Just switch the Func BravoFunc back the Task it was in the question and use selectMany instead. Aron has it basically right, this pattern is super common in Rx code doing network requests. – Lee Campbell Aug 08 '13 at 08:38
  • Oh and also Aron, you forgot to remove all the code from `.Subscribe(` up until `.ToTask()`. I have 'corrected' and extended your answer below, but up voted this answer. – Lee Campbell Aug 08 '13 at 09:07
1

First, I wound't return a TaskCompletionSource. That is a means to an end...an implementation detail of your method that should be hidden from the public API. Your method should instead return a Task (it should just return result.Task).

Anyway, if you are just working with tasks, you should just use TPL and not use Rx. Only use Rx if you actually need to integrate your tasks with other rx code. Even your DoWorkInParallel can be made much simpler if you do not mix in the Rx stuff. Rx works fabulously with complex Task stuff. But scenarios you are describing are relatively simple and can be solved simply with TPL.

Here is how to do both the parallel and sequential versions in TPL:

/// <summary>Extension methods for timing out tasks</summary>
public static class TaskExtensions
{
    /// <summary> throws an error if task does not complete before the timer.</summary>
    public static async Task Timeout(this Task t, Task timer)
    {
        var any = await Task.WhenAny(t, timer);
        if (any != t)
        {
           throw new TimeoutException("task timed out");
        }
    }

    /// <summary> throws an error if task does not complete before the timer.</summary>
    public static async Task<T> Timeout<T>(this Task<T> t, Task timer)
    {
        await Timeout((Task)t, timer);
        return t.Result;
    }

    /// <summary> throws an error if task does not complete in time.</summary>
    public static Task Timeout(this Task t, TimeSpan delay)
    {
        return t.IsCompleted ? t : Timeout(t, Task.Delay(delay));
    }

    /// <summary> throws an error if task does not complete in time.</summary>
    public static Task<T> Timeout<T>(this Task<T> t, TimeSpan delay)
    {
        return Timeout((Task)t, delay);
    }
}

// .. elsewhere ..
public async Task<string> DoWorkInParallel()
{
    var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
    var alphaTask = Task.Run(() => 4);
    var betaTask = Task.Run(() => true);

    // wait for one of the tasks to complete
    var t = await Task.WhenAny(alphaTask, betaTask).Timeout(timer);

    // exit early if the task produced an invalid result
    if ((t == alphaTask && alphaTask.Result != 5) ||
        (t == betaTask && !betaTask.Result)) return "Nothing";

    // wait for the other task to complete
    // could also just write: await Task.WhenAll(alphaTask, betaTask).Timeout(timer);
    await ((t == alphaTask) ? (Task)betaTask : (Task)alphaTask).Timeout(timer);

    // unfortunately need to repeat the validation logic here.
    // this logic could be moved to a helper method that is just called in both places.
    var alpha = alphaTask.Result;
    var beta = betaTask.Result;
    return (alpha != 5 && beta) ? (alpha.ToString() + beta.ToString()) : "Nothing";
}

public async Task<string> DoWorkInSequence()
{
    var timer = Task.Delay(TimeSpan.FromMilliseconds(200));
    var alpha = await Task.Run(() => 4).Timeout(timer);
    if (alpha != 5)
    {
        var beta = await Task.Run(() => true).Timeout(timer);
        if (beta)
        {
            return alpha.ToString() + beta.ToString();
        }
    }

    return "Nothing";
}

If you need to do your work in .Net 4.0, then you can use the Microsoft.Bcl.Async nuget package which lets you use VS2012 compiler to target .Net 4.0 and still use async/await. See this SO question: Using async-await on .net 4

Edit: I've modified the code to quit early for both the parallel and sequential versions if the tasks produce invalid values and I've modified the timeout to be combined instead of per task. Although in the sequential case, this timer will also be counting the time between the 2 tasks.

Community
  • 1
  • 1
Brandon
  • 38,310
  • 8
  • 82
  • 87
  • +1 for the TaskExtensions helper. However you can just remove the async from the extension method and just return the Task straight up. – Aron Aug 06 '13 at 07:52
  • @Aron I'm not sure how. It needs to wait for WhenAny to resolve so it can either return failed task or a task with the result. – Brandon Aug 06 '13 at 10:17
  • This is *very* close to my answer. I've updated my question with clarified requirements. Can they be met with slight adjustments? – Brent Arias Aug 06 '13 at 22:10
  • @Brandon I meant the function I made public. – Aron Aug 07 '13 at 01:53
  • @Aron yes I know which function you meant. But I don't think your proposal would work. If it just returned the original task then it wouldn't be waiting for the timeout. – Brandon Aug 07 '13 at 14:51
1

Aron had it almost spot on

public Task<string> DoWorkSequentially()
{
   Task<int> AlphaTask = Task.Run(() => 4);    //Some work;
   Task<bool> BravoTask = Task.Run(() => true);//Some other work;

   //Prepare for Rx, and set filters to allow 'Zip' to terminate early
   //in some cases.
   IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5);
   IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y);

    return (from alpha in AsyncAlpha
           from bravo in AsyncBravo
           select bravo.ToString() + alpha.ToString())
       .Timeout(TimeSpan.FromMilliseconds(200))
       .Concat(Observable.Return("Nothing"))   //Return Nothing if no result
       .Take(1)
       .ToTask();
}

Here I have just put the BravoFunc back to a BravoTask. I have removed the TaskCompletionSource (just as Aron did). Finally you use the ToTask() operator to turn the Rx continuation back to a Task<string>.

Note that

    from alpha in AsyncAlpha
    from bravo in AsyncBravo
    select bravo.ToString() + alpha.ToString()

Can also be written as

    AsyncAlpha.SelectMany(a=>AsyncBravo.Select(b=> b.ToString() + a.ToString()))

The SelectMany operator is very handy for these types of continuations. It is even more convenient in the Query Comprehension Syntax as you still have access to the bravo and alpha in the final select clause.

As you can see this becomes extremely helpful once you have many continuations. For example consider an example where you need 3 or 4 continuations

    from a in Alpha
    from b in Bravo
    from c in Charlie
    from d in Delta
    select a+b+c+d

This has alsorts of real world applications. I see this as a common pattern. Some examples include; Waiting for the server to be connected, then getting a session token to pass to a service client.

    from isConnected in _server.ConnectionState.Where(c=>c)
    from session in _server.GetSession()
    from customer in _customerServiceClient.GetCustomers(session)
    select customer;

or perhaps in a Social Media feed where we need to authenticate, find the contact, get a list of their emails and then pull down the first 20 headers of these emails.

    from accessToken in _oauth.Authenticate()
    from contact in _contactServiceClient.GetContact(emailAddress, accessToken)
    from imapMessageId in _mailServiceClient.Search(contact).Take(20)
    from email in _mailServiceClient.GetEmailHeaders(imapMessageId)
    select email;
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • 1
    your answer is executing the tasks in parallel, not sequentially. It also doesn't return `"Nothing"` if either of the tasks produces the "invalid" value. – Brandon Aug 08 '13 at 14:52
  • It is executing in sequential order. The `from bravo in AsyncBravo` line only effectively runs when `from alpha in AsyncAlpha` prduces a value i.e. in serial or sequential manner. That is what SelectMany does. – Lee Campbell Aug 08 '13 at 15:25
  • Updated to have the "Nothing" feature. – Lee Campbell Aug 08 '13 at 15:29
  • 1
    Your code starts both tasks at the beginning of the method (`Task.Factory.StartNew`). They are now running in parallel. Your linq expression just waits for their results in sequence. – Brandon Aug 08 '13 at 15:50
  • And see the OP's requirement #1 before just wrapping them in `Observable.FromAsync()`. The bravo task effectively needs to be defined after the result of the alpha task is known to satisfy that requirement. – Brandon Aug 08 '13 at 15:54
  • LOL. Correct. I had ignored the implmentation of the tasks. Lets assume that the person wont actually be returning `4` from a task and that the tasks are actually IO or compute bound and that they have not yet started. Sorry I thought that was all implied. Else the question itself is pointless. returning 4 doesn't need any asynchrony at all – Lee Campbell Aug 08 '13 at 16:16