Here is the code involved:
private static async Task DoRunInOrderAsync<TTaskSeed>(SemaphoreSlim sem, IObservable<TTaskSeed> taskSeedSource, CreateTaskDelegate<TTaskSeed> createTask, OnTaskErrorDelegate<TTaskSeed> onFailed, OnTaskSuccessDelegate<TTaskSeed> onSuccess) where TTaskSeed : class
{
var tasks = await taskSeedSource
.Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
.ToList()
.ToTask();
await Task.WhenAll(tasks);
}
private static async Task GetPendingOrRunningTask<T>(T taskSeed, CreateTaskDelegate<T> createTask, OnTaskErrorDelegate<T> onFailed, OnTaskSuccessDelegate<T> onSuccess,
SemaphoreSlim sem) where T : class
{
Exception exc = null;
await sem.WaitAsync();
try
{
var task = createTask(taskSeed);
if (task != null)
{
await task;
}
onSuccess(task, taskSeed);
}
catch (Exception e)
{
exc = e;
}
sem.Release();
if (exc != null)
{
onFailed(exc, taskSeed);
}
}
Where:
Select
isIObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
fromSystem.Reactive.Linq.Observable
ToList
isIObservable<IList<TSource>> ToList<TSource>(this IObservable<TSource> source)
fromSystem.Reactive.Linq.Observable
ToTask
isTask<TResult> ToTask<TResult>(this IObservable<TResult> observable)
fromSystem.Reactive.Threading.Tasks.TaskObservableExtensions
- System.Reactive.Linq version is 2.2.5.0
As far as I can see, everything is built, no stale binaries around. The error occurs often, but not always.
For the life of me, I cannot understand how the tasks
list can contain null
if the GetPendingOrRunningTask
method is async Task
?
EDIT
So ToList
injects null
. How? Why? What am I doing wrong (besides programming for a living) ?