6

This is a rather educational, out of curiosity question. Consider the following snippet:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

Where SubscribeDebug subscribes a simple observer:

public class DebugObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Debug.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Debug.WriteLine("Error");
    }

    public void OnNext(T value)
    {
        Debug.WriteLine("Value: {0}", value);
    }
}

The output of this is:

Value: 0

Value: 1

Value: 2

Value: 3

Value: 4

And then blocks. Can someone help me understand the underlying reason why it happens and why the observable does not complete? I have noticed that it does complete without the Concat call, but blocks with it.

Community
  • 1
  • 1
Martin Zikmund
  • 38,440
  • 7
  • 70
  • 91
  • Does this behavior also exists when you concat a different observable which is already completed? – Progman Apr 05 '20 at 11:12
  • 1
    Your code is creating a deadlock due to the scheduler used. Try this instead: `.ToObservable(Scheduler.Default)`. That works with your code. I'll need to spend more time on it to give you the reason why. – Enigmativity Apr 05 '20 at 11:26
  • 1
    @Progman - You're on the wrong track. The each subscription to `enumerable.ToObservable()` starts the enumerable again. Like to `foreach` calls over an enumerable start the enumerable again. The problem here is a deadlock caused by the `Scheduler.Immediate` scheduler. – Enigmativity Apr 05 '20 at 11:29
  • 3
    The problem does not seem to be with the Scheduler.Immediate, because when I pass it to ToObservable(), both enumerations are iterated. When called without any scheduler implementation however, the code blocks. – Oguz Ozgul Apr 05 '20 at 12:06
  • 2
    @OguzOzgul it only deadlocks with `Scheduler.CurrentThread` from all static schedulers. So this is the default I guess (when `ToObservable` is called without argument). – Theodor Zoulias Apr 06 '20 at 11:29

1 Answers1

9

I've looked at the source of ToObservable and distilled a minimal implementation. It does reproduce the behavior we're seeing.

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            {
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                {
                    if (enumerator.MoveNext()) 
                    {
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    }
                    else
                    {
                        observer.OnCompleted();
                    }

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                }

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            }
        );

With that out of the way - the crux of the problem lies in the behavior of CurrentThreadScheduler, which is the default scheduler used.

The behavior of CurrentThreadScheduler is that if a schedule is already running while Schedule is being called - it ends up being queued.

        CurrentThreadScheduler.Instance.Schedule(() =>
        {
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        });

This prints 2 1. This queuing behavior is our undoing.

When observer.OnCompleted() is called, it causes Concat to start the next enumeration - however, things are not the same as when we started out - we are still inside the observer => { } block when we try to schedule the next one. So instead of executing immediately, the next schedule gets queued.

Now enumerator.MoveNext() is caught in a dead-lock. It can't move to the next item - MoveNext is blocking until the next item arrives - which can only arrive when scheduled by the ToObservable loop.

But the Scheduler can only work to notify ToEnumerable and subsequently MoveNext() which is being held up - once it exits loopRec - which it can't because it's being blocked by MoveNext in the first place.

Addendum

This is approximately what ToEnumerable (from GetEnumerator.cs) does (not a valid implementation):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    {
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value => { queue.Enqueue(value); gate.Release(); }, 
            () => gate.Release()))
        while (true)
        {
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        }
    }

Enumerables are expected to be blocking until the next item is yielded - and that's why there's a gating implementation. It's not Enumerable.Range which blocks, but ToEnumerable.

Asti
  • 12,447
  • 29
  • 38
  • But I implemented a custom IEnumerable and returned a custom IEnumerator and what I see is, when the iteration over the first enumerator completes, the GetEnumerator() is invoked again and (a new one is returned by me), but the MoveNext() is never called. – Oguz Ozgul Apr 06 '20 at 11:41
  • 1
    I should clarify - it's not your own `IEnumerable` that matters - it's the one returned by `Observable.ToEnumerable()`. That's where it blocks. – Asti Apr 06 '20 at 11:48
  • Ok that's it then. (Note: +1 already, before my comment), I was looking into the same constructor using !! ildasm !! and got blind and gave up. Should have downloaded reflector :) – Oguz Ozgul Apr 06 '20 at 11:51
  • Oh and the source of is of course on GitHub.. Poor me. – Oguz Ozgul Apr 06 '20 at 11:52
  • Haha. I updated my answer. I did write up `ToEnumerable` but, it was mostly non-functional - just illustrative, so I decided not to post. – Asti Apr 06 '20 at 12:00
  • This is a superb answer! Gives a lot of insights about how observables work in a synchronous (single thread) environment. – Theodor Zoulias Apr 06 '20 at 12:14
  • 1
    @TheodorZoulias Thank you! The problem became so much clearer once I implemented `ToObservable` - Rx stacktraces are fairly unreadable. Writing up simplified Rx operators to understand their behavior is a good learning experience. :) – Asti Apr 06 '20 at 12:26
  • 1
    Incredible! Thank you for such a detailed answer! – Martin Zikmund Apr 06 '20 at 19:42
  • 2
    Fabulous answer! – Enigmativity Apr 07 '20 at 02:19
  • I must admit I'm slightly perplexed that the default scheduler involved in these operations isn't safe. Is this something that needs fixing in the library, or is there some recommendation to avoid deadlocks like this? – NickL Apr 10 '20 at 13:39
  • @NickL With current defaults, the library is fairly performant. These are Rx's concurrency abstractions leaking - which is why we had to go back to the source. In most all usage cases you will never encounter these edge cases in idiomatic Rx. – Asti Apr 16 '20 at 08:24