2

I have the following code:

var tasks = await taskSeedSource
    .Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
    .ToList()
    .ToTask();

if (tasks.Count == 0)
{
    return;
}

if (tasks.Contains(null))
{
    tasks = tasks.Where(t => t != null).ToArray();
    if (tasks.Count == 0)
    {
        return;
    }
}

await Task.WhenAll(tasks);

Where taskSeedSource is a Reactive Observable. It could be that this code have many problems, but I see at least two:

  1. I am collecting tasks whereas I could do without it.
  2. Somehow, the returned tasks list may contain nulls, even though GetPendingOrRunningTask is an async method and hence never returns null. I failed to understand why it happens, so I had to defend against it without understanding the cause of the problem.

I would like to use the AsyncCountdownEvent from the AsyncEx framework instead of collecting the tasks and then awaiting on them.

So, I can pass the countdown event to GetPendingOrRunningTask which will increment it immediately and signal before returning after awaiting for the completion of its internal logic. However, I do not understand how to integrate the countdown event into the monad (that is the Reactive jargon, isn't it?).

What is the right way to do it?

EDIT

Guys, let us forget about the mysterious nulls in the returned list. Suppose everything is green and the code is

var tasks = await taskSeedSource
    .Select(taskSeed => GetPendingOrRunningTask(taskSeed, ...))
    .ToList()
    .ToTask();

await Task.WhenAll(tasks);

Now the question is how do I do it with the countdown event? So, suppose I have:

var c = new AsyncCountdownEvent(1);

and

async Task GetPendingOrRunningTask<T>(AsyncCountdownEvent c, T taskSeed, ...)
{
  c.AddCount();
  try
  {
    await ....
  }
  catch (Exception exc)
  {
    // The exception is handled
  }
  c.Signal();  
}

My problem is that I no longer need the returned task. These tasks where collected and awaited to get the moment when all the work items are over, but now the countdown event can be used to indicate when the work is over.

My problem is that I am not sure how to integrate it into the Reactive chain. Essentially, the GetPendingOrRunningTask can be async void. And here I am stuck.

EDIT 2

Strange appearance of a null entry in the list of tasks

Community
  • 1
  • 1
mark
  • 59,016
  • 79
  • 296
  • 580
  • 2
    Contrary to your statement, you won't ever have null tasks, and do not need to code against them. There is also no reason to special case zero tasks. `WhenAll` will simply be completed right away if there are no tasks in the collection. `AsyncCountdownEvent` would make the code more complex, not less complex. – Servy Dec 09 '14 at 18:51
  • I believe `ToTask` in your code is typo. Otherwise that won't compile isn't it? – Sriram Sakthivel Dec 09 '14 at 18:54
  • You may be right about `WhenAll`, but I am not delusional about the nulls. My unit tests failed because the list contained nulls. I cannot explain it, but I do not invent it. – mark Dec 09 '14 at 18:54
  • @SriramSakthivel - I am using the System.Reactive.Linq library. This is a Reactive extension and it compiles just fine. – mark Dec 09 '14 at 18:55
  • @mark So what is the type of `tasks`? That is task or list? – Sriram Sakthivel Dec 09 '14 at 18:56
  • @mark `ToTask` will work just fine, but you use the returned value as if it's a list, not a single task. It's the subsequent usage that will fail. – Servy Dec 09 '14 at 18:56
  • 4
    @mark If you have null values then something is very wrong, and you should figure out what that is and fix the root cause, rather than trying to code around it. – Servy Dec 09 '14 at 18:58
  • Yes, @servy got the point. That's what I'm asking. Your code just won't compile. – Sriram Sakthivel Dec 09 '14 at 18:59
  • Guys, my code compiles now already. More than that, it runs very well, if I ignore the possibility of nulls (which only occurred in unit tests). `ToList` returns `IObservable>`, calling `ToTask` returns `IList`. @SriramSakthivel - are you compiling with the Reactive libraries? – mark Dec 09 '14 at 19:03
  • Where does `ToTask` returns `IList`? I don't see [here](http://msdn.microsoft.com/en-us/library/system.reactive.threading.tasks.taskobservableextensions.totask%28v=vs.103%29.aspx) (Not having a Visual studio right now). As name suggests `ToTask` returns a Task only unless am missing something. Also I believe servy has answered your question with his first and last comment(at the moment). – Sriram Sakthivel Dec 09 '14 at 19:09
  • `System.Reactive.Threading.Tasks.TaskObservableExtensions.ToTask` - `Task ToTask(this IObservable observable)`. My mistake in the previous comment, `ToTask` returns `Task>`, but the outer task is stripped by the `await` keyword. Sorry, servy's is reply is not satisfactory. I guess this is why it is a comment and not an answer. – mark Dec 09 '14 at 19:20
  • 1
    @mark: Sounds like your mocking framework may be returning a `null` task. Try upgrading your mocking framework (no modern versions will do this - except VS stubs). – Stephen Cleary Dec 10 '14 at 00:26
  • I wish it was my mocking framework. I use moq and I return `Task.Run` from it. Anyway, I left it as a mystery of mstest. What I really want is to get rid of the task list at all and use AsyncCountdownEvent instead. – mark Dec 10 '14 at 00:30
  • @StephenCleary - mystery resolved. http://stackoverflow.com/questions/27412560/strange-appearance-of-a-null-entry-in-the-list-of-tasks, but I still want to know if I could use your AsyncCountdownEvent here. – mark Dec 11 '14 at 02:36
  • @mark: I don't see a benefit from doing so. Dave Sexton's answer looks good to me. – Stephen Cleary Dec 11 '14 at 13:38

1 Answers1

1

@Servy is correct that you need to solve the null Task problem at the source. Nobody wants to answer a question about how to workaround a problem that violates the contracts of a method that you've defined yourself and yet haven't provided the source for examination.

As for the issue about collecting tasks, it's easy to avoid with Merge if your method returns a generic Task<T>:

await taskSeedSource
  .Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
  .Where(task => task != null)  // According to you, this shouldn't be necessary.
  .Merge();

However, unfortunately there's no official Merge overload for the non-generic Task but that's easy enough to define:

public static IObservable<Unit> Merge(this IObservable<Task> sources)
{
  return sources.Select(async source =>
  {
    await source.ConfigureAwait(false);
    return Unit.Default;
  })
  .Merge();
}
Dave Sexton
  • 2,562
  • 1
  • 17
  • 26
  • So, basically you are saying I do not need any countdown event at all. – mark Dec 10 '14 at 02:07
  • Yea, did you try my example? (You only need the new `Merge` extension that I've shown if you're using a non-generic `Task`) – Dave Sexton Dec 10 '14 at 02:21
  • Which Merge method are you using? The reason I am asking is because I do not see a Merge extension method without arguments. – mark Dec 10 '14 at 03:27
  • It's an extension method for `IObservable>`. I believe it was introduced in Rx 2.0. What version of Rx are you using? – Dave Sexton Dec 10 '14 at 03:36
  • 2.2.5.0 Could you give me the full type name of the extension class containing this method? – mark Dec 10 '14 at 03:50
  • It's with all of the other extension methods. `System.Reactive.Linq.Observable` – Dave Sexton Dec 10 '14 at 07:46
  • OK, found it. I will try this approach. Though, I have a feeling using the countdown event might yield a more efficient code. – mark Dec 10 '14 at 15:01
  • I am still working on it, but I definitely need to understand what is going on with the `null` - http://stackoverflow.com/questions/27412560/strange-appearance-of-a-null-entry-in-the-list-of-tasks – mark Dec 10 '14 at 23:11
  • What do you mean by "Merge locks inside"? – Dave Sexton Dec 11 '14 at 04:02
  • Could it be that a solution using `AsyncCountdownEvent` is going to result in less lock congestion than the one using `Merge`? – mark Dec 11 '14 at 04:25
  • Based on your comment in http://stackoverflow.com/questions/27415098/why-is-it-possible-to-await-an-rx-observable the current solution is expected to throw if the source is empty. – mark Dec 11 '14 at 04:30
  • How would you suggest to resolve the empty observable issue? Right now the code indeed throws. – mark Dec 11 '14 at 17:34
  • You could use the `DefaultIfEmpty` operator. – Dave Sexton Dec 11 '14 at 17:36