0

Background. I am trying to write a function that takes multiple async delegates that return same type and that return result is subjected to some filtering. All have to respond to cancellation as well. So I wrote a func and tests. One of the tests When_All_Tasks_Exceed_Specified_Time_Should_Throw just hangs and I cannot figure out why. Any help with this would be very much appreciated.

public static class TaskHelpers
{
    /// <summary>
    /// Takes in a collection of tasks and returns first one that completes.
    /// Evaluates each completed task against predicate, and if satisfied 
    /// returns completion result.
    /// Collects any exceptions raised by tasks, and if none have
    /// successfully completed, returns exceptions
    /// </summary>
    public static Task<T> GetFirst<T>(
        ICollection<Func<CancellationToken, Task<T>>> tasks,
        Predicate<T> predicate,
        CancellationToken cancellationToken)
        where T : class
    {
        if (tasks == null)
        {
            throw new ArgumentNullException(nameof(tasks));
        }

        if (predicate == null)
        {
            throw new ArgumentNullException(nameof(predicate));
        }

        var tcs = new TaskCompletionSource<T>(TaskCreationOptions
        .RunContinuationsAsynchronously);
        var completedCount = 0;
        var exceptions = new ConcurrentBag<Exception>();

        foreach (var task in tasks)
        {
            cancellationToken.ThrowIfCancellationRequested();

            task(cancellationToken)
                .ContinueWith(
                    t =>
                    {
                        cancellationToken.ThrowIfCancellationRequested();

                        if (t.Exception != null)
                        {
                            exceptions.Add(t.Exception.InnerException);
                        }
                        else if (predicate(t.Result))
                        {
                            tcs.TrySetResult(t.Result);
                        }

                        if (Interlocked.Increment(ref completedCount) == tasks.Count)
                        {
                            cancellationToken.ThrowIfCancellationRequested();

                            if (exceptions.Any())
                            {
                                tcs.TrySetException(exceptions);
                            }
                            else
                            {
                                tcs.TrySetResult(null);
                            }
                        }
                    },
                    cancellationToken);
        }

        cancellationToken.ThrowIfCancellationRequested();

        return tcs.Task;
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using Xunit.Abstractions;

namespace Test.Unit
{
    public class TaskHelpersTests
    {
        private readonly ITestOutputHelper console;

        public TaskHelpersTests(ITestOutputHelper console)
        {
            this.console = console;
        }

        [Fact]
        public async Task When_All_Tasks_Exceed_Specified_Time_Should_Throw()
        {
            var delays = new int[] { 5 };

            var timeout = TimeSpan.FromSeconds(2);
            var cts = new CancellationTokenSource();
            cts.CancelAfter(timeout);
            var token = cts.Token;

            var funcs = delays.Select(
                delay => GetFunc(
                    funcId: delay.ToString(),
                    delay: TimeSpan.FromSeconds(delay),
                    satisfiesPredicate: true));

            /*await Assert.ThrowsAsync<OperationCanceledException>(
                async () => await GetFirst(
                    funcs.ToArray(),
                    d => d.SatisfiesPredicate,
                    token));*/
            try
            {
                var result = await GetFirst(
                    funcs.ToArray(),
                    d => d.SatisfiesPredicate,
                    token);
            }
            catch (Exception e)
            {
               e.GetType().ShouldBe(typeof(OperationCanceledException));
            }
            finally
            {
                cts.Dispose();
            }
        }

       
        private static Func<CancellationToken, Task<Dummy>> GetFunc(
            string funcId,
            TimeSpan delay,
            bool satisfiesPredicate)
        {
            return
                async ct =>
                {
                    ct.ThrowIfCancellationRequested();

                    await Task.Delay(delay, ct);

                    return
                        new Dummy()
                        {
                            Id = funcId,
                            SatisfiesPredicate = satisfiesPredicate,
                        };
                };
        }

        private class Dummy
        {
            public string Id { get; set; }

            public bool SatisfiesPredicate { get; set; }
        }
    }
}
Fildor
  • 14,510
  • 4
  • 35
  • 67
epitka
  • 17,275
  • 20
  • 88
  • 141
  • 1
    Java? Add tag. Please minimize the code that demonstrate the problem. The generic approach is the run the code in a debugger, then figure out where each thread is. With java you can send a signal to the process to get a stack trace. The manual option is to print to figure out where thread of execution is. "hang" most often means a loop is not terminating, but it could also be a dead or live lock etc. – Allan Wind Feb 16 '21 at 08:11
  • 3
    You need an [mcve] – TheGeneral Feb 16 '21 at 08:17
  • At a quick read, it _seems_ like you're never actually awaiting the tasks being passed in, so they're never being run, so the TCS never completes because it's just waiting on any of the tasks to ever succeed or fail. – Martin Costello Feb 16 '21 at 08:27
  • Tasks passed to GetFirst() are never started! – cly Feb 16 '21 at 08:29
  • Could you edit the code and add some line-breaks, so that the code is readable without horizontal scrolling? – Theodor Zoulias Feb 16 '21 at 08:39
  • 2
    As a side note, the `ICollection>> tasks` is a bit misleading. This is not a collection of tasks, it's a collection of asynchronous delegates. – Theodor Zoulias Feb 16 '21 at 08:42
  • Related: [How to implement Task.WhenAny() with a predicate](https://stackoverflow.com/questions/38289158/how-to-implement-task-whenany-with-a-predicate). This implementation seems like an attempt to improve upon Ohad Schneider's [solution](https://stackoverflow.com/a/38291506/11178549). – Theodor Zoulias Feb 16 '21 at 10:44

1 Answers1

0

This is more of a code review than an answer to the specific question. But it may help you find where the bug is.

  1. task(cancellationToken): this method is invoked without error handling. In case it fails, the whole GetFirst method will fail synchronously, and the exception will not be propagated through the Task return value. Any asynchronous operations that may have already started will be left behind unobserved, running in a fire-and-forget fashion.
  2. .ContinueWith(: this methods return a Task, which your code ignores, so it becomes a fire-and-forget task. If this task fails, you'll never know about it.
  3. .ContinueWith(: this methods should be supplied with an explicit taskScheduler argument, preferably with the TaskScheduler.Default. Otherwise it's unpredictable. Currently you code is in the mercy of whatever the TaskScheduler.Current (the default if you don't supply the argument) may be.
  4. var exceptions = new ConcurrentBag<Exception>();: The ConcurrentBag<T> is a very specialized collection. In your case what you need is a ConcurrentQueue<T>, in order to preserve the chronological order of the exceptions.
  5. else if (predicate(t.Result)): Just because you checked for an .Exception earlier, doesn't mean that a .Result is available. There is a third possibility: the task may be in a Canceled state.
  6. if (Interlocked.Increment(ref completedCount) == tasks.Count): one could argue that you are putting too much faith on the stability of the Count property of the supplied ICollection<T>. But in practice I would say that it's unlikely to cause problems.
  7. cancellationToken.ThrowIfCancellationRequested();: in case the cancellationToken becomes signaled after materializing the tasks, the GetFirst method will fail synchronously just like in the first point of this list, and all running tasks will become fire-and-forget tasks.
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104