4

I have two instances of a class which creates a UDP socket to receive data from UDP clients. If one of the instances throws an exception I want to handle it immediately in a higher layer. In my program they're started with await Task.WhenAll(recv1.StartAsync(), recv2.StartAsync). This however waits for all tasks to finish before the first exception is thrown. Any ideas on how to resolve this problem?

static async Task Main(string[] args)
{
  var udpReceiver1 = new UdpReceiver(localEndpoint1);
  var udpReceiver2 = new UdpReceiver(localEndpoint2);

  var cts = new CancellationTokenSource();

  try
  {
    await Task.WhenAll(udpReceiver1.StartAsync(cts.Token), udpReceiver2.StartAsync(cts.Token));
  }
  catch (Exception e)
  {
    // Handle Exception...

    cts.Cancel();
  }
}

class UdpReceiver
{
  public UdpReceiver(IPEndPoint endpoint)
  {
    udpClient = new UdpClient(endpoint);
  }

  public async Task StartAsync(CancellationToken cancellationToken)
  {
    try
    {
      while (!cancellationToken.IsCancellationRequested)
      {
        var result = await ReceiveAsync(cancellationToken);
        var message = Encoding.UTF8.GetString(result.Buffer);
        Trace.WriteLine($"UdpClient1 received message:{Encoding.UTF8.GetString(result.Buffer)}");
      
        // throw new Exception("UdpClient1 raising exception");
      }
    }
  }

  private async Task<UdpReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
  {
    var tcs = new TaskCompletionSource<UdpReceiveResult>();
    using (cancellationToken.Register(() => tcs.TrySetCanceled(), false))
    {
      var task = udpClient.ReceiveAsync();

      var completedTask = await Task.WhenAny(task, tcs.Task);

      var result = await completedTask.ConfigureAwait(false);

      return result;
    }
  }

  private UdpClient udpClient;
}

Update 1: Task.WhenAny would be a viable solution. Thanks @CamiloTerevinto

try
{
  await await Task.WhenAny(udpReceiver1.StartAsync(cts.Token), udpReceiver2.StartAsync(cts.Token));
}
catch (Exception e)
{
  // Handle Exception...

  cts.Cancel();
}

Update 2: For a more fine grained exception handling of all tasks I'd go with my own adapted implementation of Task.WhenAll proposed by @Servy.

elvetian
  • 43
  • 4
  • 6
    Well, `Task.WhenAll` is exactly so that you wait for *all* Tasks. You could use `Task.WhenAny` but need to handle non-failures by yourself – Camilo Terevinto Sep 24 '20 at 19:37
  • What do you want to happen in case an instance fails? Do you want to handle this error, and then kill the process ignoring the other running instance? – Theodor Zoulias Sep 25 '20 at 00:36
  • Both instances should be terminted gracefully in case of an exception – elvetian Sep 25 '20 at 07:45
  • I think that you should include in the question that you want both instances to terminate gracefully. It is an important information for answering this question. – Theodor Zoulias Sep 25 '20 at 08:52

3 Answers3

3

The behavior is different enough from the framework WhenAll implementation that you're probably best just writing your own adapted version, fortunately it's not particularly hard to implement. Just attach a continuation to every single task, if any is cancelled or faulted, the resulting task does the same, if it succeeds store the result, and if the last task was the one to succeed, complete the task with all of the stored results.

public static Task<IEnumerable<TResult>> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
    var listOfTasks = tasks.ToList();
    if (listOfTasks.Count == 0)
    {
        return Task.FromResult(Enumerable.Empty<TResult>());
    }
    var tcs = new TaskCompletionSource<IEnumerable<TResult>>();
    var results = new TResult[listOfTasks.Count];
    int completedTasks = 0;
    for (int i = 0; i < listOfTasks.Count; i++)
    {
        int taskIndex = i;
        Task<TResult> task = listOfTasks[i];
        task.ContinueWith(_ =>
        {
            if (task.IsCanceled)
                tcs.TrySetCanceled();
            else if (task.IsFaulted)
                tcs.TrySetException(task.Exception.InnerExceptions);
            else
            {
                results[taskIndex] = task.Result;
                if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
                {
                    tcs.TrySetResult(results);
                }
            }
        });
    }
    return tcs.Task;
}

As with so many task based generic operations, you also need a version without the results, and if you don't want to deal with a notable overhead, you really need to just copy-paste the result-based approach but with all of the results ripped out, which isn't hard, just inelegant. Turning all of these tasks into tasks with a result would also work, but for an operation like this the overhead is likely problematic.

public static Task WhenAll(IEnumerable<Task> tasks)
{
    var listOfTasks = tasks.ToList();
    if (listOfTasks.Count == 0)
    {
        return Task.CompletedTask;
    }
    var tcs = new TaskCompletionSource<bool>();
    int completedTasks = 0;
    for (int i = 0; i < listOfTasks.Count; i++)
    {
        int taskIndex = i;
        Task task = listOfTasks[i];
        task.ContinueWith(_ =>
        {
            if (task.IsCanceled)
                tcs.TrySetCanceled();
            else if (task.IsFaulted)
                tcs.TrySetException(task.Exception.InnerExceptions);
            else
            {
                if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
                {
                    tcs.TrySetResult(true);
                }
            }
        });
    }
    return tcs.Task;
}
Servy
  • 202,030
  • 26
  • 332
  • 449
  • 1
    IMHO the current name of this `WhenAll` method is misleading. A better name could be `WhenAllSuccessfulOrAnyFailed`. – Theodor Zoulias Sep 25 '20 at 00:39
  • @TheodorZoulias You're free to call it what you want in your own implementations. Personally I consider this the behavior that's virtually always preferable when waiting for a group of tasks to complete, so I prefer it as I have it. Honestly I wish the .NET implementation would have been implemented this way from the start (obviously a breaking change to change it isn't going to happen). After all, do you call foreach "ForEachOrAnyFailed"? Do you call List.RemoveAll "RemoveAllOrAnyFailed"? That an operation ends early in the face of an exception is typical, not atypical, behavior. – Servy Sep 25 '20 at 01:40
  • Good rationalizing attempt, but I am not buying it. :-) If the `WhenAll` was implemented this way, it would encourage firing-and-forgetting: I start 10 tasks, one fails, I forget the other 9. This is sloppy, it would result to less robust programs. – Theodor Zoulias Sep 25 '20 at 02:12
  • @TheodorZoulias You're not observing the result of the other tasks regardless of which behavior you use. Either way, the operation faulted, and so you're getting an exception. This is no different that literally every other operation that throws an exception in that, instead of observing the intended result, it stops prematurely and you get the exception. By your logic every single method that ever throws an exception encourages firing and forgetting that operation because you ignore the result when it throws. – Servy Sep 25 '20 at 02:36
  • @TheodorZoulias Of all of the times that you needed the results of 10 operations to finish to continue, and one of them failed, how often was it important for the correctness of your program for you to perform 9 more operations, wait for them to finish, and then ignore their results so you could throw the original exception? I expect in practice, close to every single time you needed the results of 10 operations to proceed with your method, you didn't care about the other 9 once one of them threw. – Servy Sep 25 '20 at 02:38
  • When a synchronous operation fails, nothing is forgotten. The result of the operation is the exception, and there is nothing else going on. On the other hand when you initiate multiple asynchronous operations, it becomes possible to let some of them unobserved, if you so wish. But allowing asynchronous operations to run in the background forgotten and unobserved is not a nice situation, because each of them consumes resources. If you are not interested about them any more, you should cancel them (cooperatively), and observe their cancellation before continuing. – Theodor Zoulias Sep 25 '20 at 03:34
  • It is analogous to how `Parallel.ForEach` works. When a worker thread fails, the `Parallel.ForEach` does not return immediately, but waits the other workers to complete their current processing before reporting failure. If you object the behavior of `Task.WhenAll`, you should object the behavior of `Parallel.ForEach` too. – Theodor Zoulias Sep 25 '20 at 03:34
  • @TheodorZoulias You've just explained **exactly** why this solution is desirable. If, upon failure of any one operation, you want to cancel a token so that the rest finish immediately, you want WhenAll to fault *immediately* so that you can then cancel that token. If WheAll just waits for all of them to finish before telling you that there was an exception, you can no longer cancel them. – Servy Sep 25 '20 at 12:25
  • I am not saying that a custom `WhenAll` that fails fast has no uses. Actually I have [asked myself](https://stackoverflow.com/questions/57313252/how-can-i-await-an-array-of-tasks-and-stop-waiting-on-first-exception) for such a method in the past (as well as a [similar question](https://stackoverflow.com/questions/58330380/how-to-force-an-actionblock-to-complete-fast) about TPL Dataflow blocks). But after a year I am now much more skeptical about using such a method alone, without being followed by the standard `Task.WhenAll`. – Theodor Zoulias Sep 25 '20 at 12:46
  • @TheodorZoulias Honestly, anytime anyone is doing anything where they would feel compelled to wait for everything is a strong indication to me that they're doing something wrong. It would suggest that the operations are useful for their externally visible side effects, not their results (something to be avoided whenever possible). In a well designed solution the tasks' wouldn't have any observable effects on any external code, and so the only effect of continuing on without them finishing is going to be CPU consumption. In the *exceptional* cases where that's not possible, one can wait. – Servy Sep 25 '20 at 12:50
  • @TheodorZoulias And honestly, even in the case where the operation is used for its side effects, only some would really need to be waited for, and that's cases where the thing(s) that the operation is affecting are things that you still expect to be usable when the operations are failing. Ideally that would be a small percent of cases. If they're failing, you *probably* want to just discard those things entirely. While there may be cases that you expect the things being mutated to stay in a stable state even in the face of these operations failing, that should be quite rare. – Servy Sep 25 '20 at 12:52
  • I haven't much to say about what is common and what is rare, because I think that it varies greatly per application type and business domain. Instead I would like to ask you about the specific case at hand, with the two `UdpReceiver` tasks. Would you say that observing the outcome of both tasks before exiting the program indicates that something wrong has been done? Because if you do, allow me to kindly disagree. I think that it's a good practice, that makes for a robust, maintainable and trustworthy application. – Theodor Zoulias Sep 25 '20 at 13:10
  • @TheodorZoulias Why do you think it would be helpful to you to see that one operation failed and the other operation was cancelled and that the program waited another X period of time rather than seeing that one operation faulted and knowing that the other operation was cancelled without it being reported? What are you going to do differently in either case? How will you have benefited from the program waiting for other operations to finish? You're not actually making a program that's any more maintainable or robust, unless it was extremely poorly implemented to begin with. – Servy Sep 25 '20 at 13:31
  • How would I know that the other operation was cancelled, without it being reported? I would *assume* that it was cancelled, I wouldn't *know* it. The difference between knowing and assuming is quite significant in my book. – Theodor Zoulias Sep 25 '20 at 13:41
  • @TheodorZoulias Okay, and what are you going to do with that information? How is that going to help you in maintaining that program? Why did you benefit from this UDP listener continuing to wait for packets before ending the program instead of cancelling it and then just ending the program? Looking through the code the actual socket operation doesn't accept the cancellation token, it's just that the async method was written to finish when either the token is cancelled or the operation finishes, so in this specific case, you're not even waiting for the socket to try to get its next packet. – Servy Sep 25 '20 at 13:58
  • Yeap, it seems that sadly the asynchronous API of the `UdpClient` class is not cancelable, so the OP is using the `await Task.WhenAny(task, tcs.Task)` [hack](https://stackoverflow.com/questions/19404199/how-to-to-make-udpclient-receiveasync-cancelable). So cancelling results to an unobserved task, which I don't like. I would research/experiment whether calling the [`UdpClient.Close`](https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.udpclient.close) method is a good substitute of cancellation. – Theodor Zoulias Sep 25 '20 at 15:18
1

There's probably a way to do it but I can't think of one without making your code very messy. It'd be better to handle the exception in the actual task. If you need to handle it with common code, use a handler delegate.

static async Task Main(string[] args)
{
    var cts = new CancellationTokenSource();

    //This is our common error handler
    void HandleException(Exception ex)
    {
        Log("Exception!" + ex.Message);
        cts.Cancel();
    }

    var udpReceiver1 = new UdpReceiver(localEndpoint1);
    var udpReceiver2 = new UdpReceiver(localEndpoint1);

    //We pass the handler as one of the arguments
    await Task.WhenAll(udpReceiver1.StartAsync(cts.Token, HandleException), udpReceiver2.StartAsync(cts.Token, HandleException));
}

class UdpReceiver
{
  public async Task StartAsync(CancellationToken cancellationToken, Action<Exception> errorHandler)
  {
      try
      {
          while (!cancellationToken.IsCancellationRequested)
          {
              //Main logic goes here
          }
      }
      catch(Exception ex)
      {
          errorHandler(ex);  //Call common error handling code
      }
  }
John Wu
  • 50,556
  • 8
  • 44
  • 80
  • This doesn't result in the method continuing on after the first failure, which is what it's specifically asking for. In fact, the use case here isn't even to handle the exception, it's to propagate it to the caller immediately when reached, which this specifically prevents from happening *at all* by forcing it to be caught and handled by the method in question, and also forces it to wait until everything finishes. – Servy Sep 24 '20 at 20:03
0

You could await the tasks in two steps. In the first step await any of them to complete, and in case of failure initiate the cancellation. Don't handle the exception in this step. Delay the exception handling until the second step, after awaiting all of the tasks to complete. Both tasks may have failed, so you may want to handle the exception of each one separately.

Task task1 = udpReceiver1.StartAsync(cts.Token);
Task task2 = udpReceiver2.StartAsync(cts.Token);

// Await any task to complete
Task firstCompletedTask = await Task.WhenAny(task1, task2);
if (firstCompletedTask.IsFaulted) cts.Cancel();

try
{
    // Await them all to complete
    await Task.WhenAll(task1, task2);
}
catch
{
    if (task1.IsFaulted) HandleException(task1.Exception.InnerException);
    if (task2.IsFaulted) HandleException(task2.Exception.InnerException);
}
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • How would you intend to scale a solution like this up to support more than just two tasks? Even just trying to apply this approach to 4 or 5 tasks, let alone an unknown number of tasks, would scale poorly in both complexity of code and performance. – Servy Sep 25 '20 at 12:31
  • @Servy yeap, if the tasks were more than two I would certainly need to replace the `Task.WhenAny` with a custom `WhenAnyFailed` method, implemented similarly to [yours](https://stackoverflow.com/a/64053587/11178549) `WhenAll`. :-) – Theodor Zoulias Sep 25 '20 at 12:35
  • @Servy actually now that I am thinking of it, if I had multiple tasks I would scrap the `Task.WhenAny` step, and instead I would attach a continuation to each task, that on failure would cancel the token. I have implemented something similar in [this](https://stackoverflow.com/questions/61995084/firing-recurring-tasks-at-the-same-time/61996190#61996190) answer (the `OnErrorCancel` method). – Theodor Zoulias Sep 25 '20 at 13:28