9

Given a collection of Tasks:

var americanAirlines = new FlightPriceChecker("AA");
...
var runningTasks = new List<Task<IList<FlightPrice>>>
{
    americanAirlines.GetPricesAsync(from, to),
    delta.GetPricesAsync(from, to),
    united.GetPricesAsync(from, to)
};

I would like to process the results of GetPricesAsync() in whatever order they arrive. Currently, I'm using a while loop to achieve this:

while (runningTasks.Any())
{
    // Wait for any task to finish
    var completed = await Task.WhenAny(runningTasks);
    // Remove from running list   
    runningTasks.Remove(completed);
    // Process the completed task (updates a property we may be binding to)
    UpdateCheapestFlight(completed.Result);
}

Is this a problem that can be solved more elegantly using Rx? I tried to use something like the code below but got stuck because somewhere I'd have to await each getFlightPriceTask which would block and only then execute the next one instead of taking the first that's done and then wait for the next:

runningTasks
  .ToObservable()
  .Select(getFlightPriceTask => .???.)
Krumelur
  • 32,180
  • 27
  • 124
  • 263
  • 3
    If you just want to `foreach` them then you can just [order the tasks by completion](https://stackoverflow.com/questions/16938999/sort-tasks-into-order-of-completition) rather than creating an observable of them. – Servy Jul 26 '18 at 14:49
  • Maybe you can introduce a small pipeline with TPL Dataflow, and use the last block as Observable? In that case you'll get all what you need, in order of execution – VMAtm Jul 26 '18 at 16:23

3 Answers3

6

Try this:

runningTasks
  .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
  .Merge()
  .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • Mind commenting what's going on in that code? Why the `Merge()` and why is picking the tasks in the order they complete? – Krumelur Jul 26 '18 at 15:33
  • I've added my own answer which comments on above solution. It helped me solve the problem. – Krumelur Jul 26 '18 at 18:21
4

@Shlomo's answer helped me a lot (using Merge() was the trick!) and I'd like to comment on it and also present an alternative solution.

Commenting Shlomo's solution

This solution is very simple and expresses the elegance of Rx. Only problem is that it cannot be awaited for completion. This typically not a problem in productive code where we would only care about updating a property which is then bound to the UI. The other comment I have is that the calculation is done in Subscribe() - some like to keep the subscriptions super lightweight but I think that's mostly personal preference.

runningTasks
  // Get all tasks and turn them into Observables.
  .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
  // Merge all tasks (in my case 3) into one "lane". Think of cars trying
  // to leave a three lane highway and going for a one lane exit.
  .Merge()
  // For every task "leaving the highway" calculate the minimum price.
  .Subscribe(flightPrices => UpdateCheapestFlight(flightPrices))

Alternative 1: using Do()

This is not using Subscribe() at all which is kind of against the idea of Rx but it can be awaited and therefore behaves like the original version.

await runningTasks
    .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
    .Merge()
    // Process result of each task.
    .Do(flightPrices => UpdateCheapestFlight(flightPrices))
    // Taking all elements will only complete if all three tasks have completed.
    .Take(runningTasks.Count);

Alternative 2: eliminating UpdateCheapestFlight()

Finally, I think a way to do this more Rx style is to not use the original helper method at all and tell an "Rx-story" that is easy to read.

var minFlightPrice = await runningTasks
    // Get all the tasks and turn them into Observables 
    .Select(getFlightPriceTask => getFlightPriceTask.ToObservable())
    // Merge all three into one "lane".
    .Merge()
    // Get local minimum value of each airline
    .Select(x => x.Min())
    // Take all the local minimums...
    .Take(runningTasks.Count)
    // ...and find the minimum of them.
    .Min();
Krumelur
  • 32,180
  • 27
  • 124
  • 263
0

Here is another solution:

await runningTasks
    .ToObservable()
    .Merge()
    .Do(result => UpdateCheapestFlight(result))
    .DefaultIfEmpty();

It looks similar to Shlomo's solution, but there is a subtle difference: The tasks are not projected to a nested observable (IObservable<IObservable<TResult>>), but instead to an observable of tasks (IObservable<Task<TResult>>). The Rx contains overloads of the Merge operator that work on both of these structures. The later is slightly more efficient because it avoids creating some throw-away wrappers of the tasks. The former is more powerful when we start with asynchronous delegates instead of already materialized tasks, because it allows to control the level of concurrency (by not starting all the tasks at once), and also because it can handle the automatic cancellation of any pending tasks in case the resulting observable is unsubscribed at any time for any reason (including an error occurring in any of the tasks).

The Do operator is used to process the results of the tasks in the order of their completion, one result at a time.

The DefaultIfEmpty operator is needed at the end, in order to prevent an InvalidOperationException in case the initial list of tasks was empty. This is because the resulting observable is awaited, and awaiting an observable is required to return a value (the last value emitted).

Below are the signatures of the Rx operators used in the above example:

// Converts an enumerable sequence to an observable sequence.
public static IObservable<TSource> ToObservable<TSource>(
    this IEnumerable<TSource> source);

// Merges results from all source tasks into a single observable sequence.
public static IObservable<TSource> Merge<TSource>(
    this IObservable<Task<TSource>> sources);

// Invokes an action for each element in the observable sequence, and propagates
// all observer messages through the result sequence. This method can be used for
// debugging, logging, etc. of query behavior by intercepting the message stream
// to run arbitrary actions for messages on the pipeline.
public static IObservable<TSource> Do<TSource>(this IObservable<TSource> source,
    Action<TSource> onNext);

// Returns the elements of the specified sequence or the type parameter's default
// value in a singleton sequence if the sequence is empty.
public static IObservable<TSource> DefaultIfEmpty<TSource>(
    this IObservable<TSource> source);
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104