Background. I am trying to write a function that takes multiple async delegates that return same type and that return result is subjected to some filtering. All have to respond to cancellation as well. So I wrote a func and tests. One of the tests When_All_Tasks_Exceed_Specified_Time_Should_Throw
just hangs and I cannot figure out why. Any help with this would be very much appreciated.
public static class TaskHelpers
{
/// <summary>
/// Takes in a collection of tasks and returns first one that completes.
/// Evaluates each completed task against predicate, and if satisfied
/// returns completion result.
/// Collects any exceptions raised by tasks, and if none have
/// successfully completed, returns exceptions
/// </summary>
public static Task<T> GetFirst<T>(
ICollection<Func<CancellationToken, Task<T>>> tasks,
Predicate<T> predicate,
CancellationToken cancellationToken)
where T : class
{
if (tasks == null)
{
throw new ArgumentNullException(nameof(tasks));
}
if (predicate == null)
{
throw new ArgumentNullException(nameof(predicate));
}
var tcs = new TaskCompletionSource<T>(TaskCreationOptions
.RunContinuationsAsynchronously);
var completedCount = 0;
var exceptions = new ConcurrentBag<Exception>();
foreach (var task in tasks)
{
cancellationToken.ThrowIfCancellationRequested();
task(cancellationToken)
.ContinueWith(
t =>
{
cancellationToken.ThrowIfCancellationRequested();
if (t.Exception != null)
{
exceptions.Add(t.Exception.InnerException);
}
else if (predicate(t.Result))
{
tcs.TrySetResult(t.Result);
}
if (Interlocked.Increment(ref completedCount) == tasks.Count)
{
cancellationToken.ThrowIfCancellationRequested();
if (exceptions.Any())
{
tcs.TrySetException(exceptions);
}
else
{
tcs.TrySetResult(null);
}
}
},
cancellationToken);
}
cancellationToken.ThrowIfCancellationRequested();
return tcs.Task;
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Xunit;
using Xunit.Abstractions;
namespace Test.Unit
{
public class TaskHelpersTests
{
private readonly ITestOutputHelper console;
public TaskHelpersTests(ITestOutputHelper console)
{
this.console = console;
}
[Fact]
public async Task When_All_Tasks_Exceed_Specified_Time_Should_Throw()
{
var delays = new int[] { 5 };
var timeout = TimeSpan.FromSeconds(2);
var cts = new CancellationTokenSource();
cts.CancelAfter(timeout);
var token = cts.Token;
var funcs = delays.Select(
delay => GetFunc(
funcId: delay.ToString(),
delay: TimeSpan.FromSeconds(delay),
satisfiesPredicate: true));
/*await Assert.ThrowsAsync<OperationCanceledException>(
async () => await GetFirst(
funcs.ToArray(),
d => d.SatisfiesPredicate,
token));*/
try
{
var result = await GetFirst(
funcs.ToArray(),
d => d.SatisfiesPredicate,
token);
}
catch (Exception e)
{
e.GetType().ShouldBe(typeof(OperationCanceledException));
}
finally
{
cts.Dispose();
}
}
private static Func<CancellationToken, Task<Dummy>> GetFunc(
string funcId,
TimeSpan delay,
bool satisfiesPredicate)
{
return
async ct =>
{
ct.ThrowIfCancellationRequested();
await Task.Delay(delay, ct);
return
new Dummy()
{
Id = funcId,
SatisfiesPredicate = satisfiesPredicate,
};
};
}
private class Dummy
{
public string Id { get; set; }
public bool SatisfiesPredicate { get; set; }
}
}
}