2

Here is the code involved:

private static async Task DoRunInOrderAsync<TTaskSeed>(SemaphoreSlim sem, IObservable<TTaskSeed> taskSeedSource, CreateTaskDelegate<TTaskSeed> createTask, OnTaskErrorDelegate<TTaskSeed> onFailed, OnTaskSuccessDelegate<TTaskSeed> onSuccess) where TTaskSeed : class
{
    var tasks = await taskSeedSource
        .Select(taskSeed => GetPendingOrRunningTask(taskSeed, createTask, onFailed, onSuccess, sem))
        .ToList()
        .ToTask();

    await Task.WhenAll(tasks);
}
private static async Task GetPendingOrRunningTask<T>(T taskSeed, CreateTaskDelegate<T> createTask, OnTaskErrorDelegate<T> onFailed, OnTaskSuccessDelegate<T> onSuccess,
    SemaphoreSlim sem) where T : class
{
    Exception exc = null;
    await sem.WaitAsync();
    try
    {
        var task = createTask(taskSeed);
        if (task != null)
        {
            await task;
        }
        onSuccess(task, taskSeed);
    }
    catch (Exception e)
    {
        exc = e;
    }

    sem.Release();

    if (exc != null)
    {
        onFailed(exc, taskSeed);
    }
}

Where:

  • Select is IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector) from System.Reactive.Linq.Observable
  • ToList is IObservable<IList<TSource>> ToList<TSource>(this IObservable<TSource> source) from System.Reactive.Linq.Observable
  • ToTask is Task<TResult> ToTask<TResult>(this IObservable<TResult> observable) from System.Reactive.Threading.Tasks.TaskObservableExtensions
  • System.Reactive.Linq version is 2.2.5.0

enter image description here

As far as I can see, everything is built, no stale binaries around. The error occurs often, but not always.

For the life of me, I cannot understand how the tasks list can contain null if the GetPendingOrRunningTask method is async Task ?

EDIT

enter image description here

So ToList injects null. How? Why? What am I doing wrong (besides programming for a living) ?

Community
  • 1
  • 1
mark
  • 59,016
  • 79
  • 296
  • 580
  • Are you aware of Rx's serialization contract? See §4.2 in the [Rx Design Guidelines](http://go.microsoft.com/fwlink/?LinkID=205219). The semaphore shouldn't be necessary because your `taskSeedSource` must not push overlapping notifications. If it is, then it's violating an important Rx contract which could cause race conditions within operators. – Dave Sexton Dec 11 '14 at 01:49
  • Note that if you want to run the tasks sequentially rather than concurrently, then project them into observables using `FromAsync` and call `Concat` rather than `ToList`. – Dave Sexton Dec 11 '14 at 01:54
  • It is sequentially only if the semaphore count is 1 initially. But it could be N, in which case I want to process at most N seeds concurrently until all the seeds are processed. The `InOrder` in the name is confusing a bit and is there for historic reasons. – mark Dec 11 '14 at 01:58
  • Have just read the Rx's serialization contract. So, seems like I need to use the Serialize operator, right? – mark Dec 11 '14 at 01:59
  • @DaveSexton - please, arrange your comment as an answer, I would like to credit you. – mark Dec 11 '14 at 02:27
  • Yes, `Synchronize` will work. I don't need credit :) – Dave Sexton Dec 11 '14 at 04:04

1 Answers1

2

You may have a race condition.

.ToList() calls the List<T> constructor. The code is here.

If the number of elements in your taskSeedSource changes between the time the constructor starts and finishes, it's possible that you could end up with an inconsistent list. In particular look at line 94.

ICollection<T> c = collection as ICollection<T>;
if( c != null) {
    int count = c.Count;
    if (count == 0)
    {
        _items = _emptyArray;
    }
    else {
        _items = new T[count];
        c.CopyTo(_items, 0);  /* it's possible there are now fewer elements in c */
        _size = count;
    }
} 
recursive
  • 83,943
  • 34
  • 151
  • 241