2

I'm very new to Rx and trying to wrap my head around it. Haven't read a whole lot but trying to go through a hands on lab first.

class Program
{
    static void Main(string[] args)
    {
        // one source, produces values with delays
        IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
        IObserver<int> handler = null;

        IDisposable subscription = source.Subscribe(
            i =>
            {
                Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
                Thread.Sleep(500);
            },
            exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 1 Completed observation"));

        IDisposable subscription2 = source.Subscribe(
            i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
            exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 2 Completed observation"));

        Console.WriteLine("press to cancel");
        Console.ReadLine();
        subscription.Dispose();
        subscription2.Dispose();

    }
}

This produces async interleaved execution as expected.

On the other hand, if I change the source to be synchronous, even the observers become blocking and synchronous (same thread id, doesn't go to sub2 without fully consuming sub1). Can someone help me understand this? Here's the sync version

class Program
{
    static void Main(string[] args)
    {
        // one source, produces values
        IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
        IObserver<int> handler = null;

        // two observers that consume - first with a delay and the second immediately.
        // in this case, the behavior of the observers becomes synchronous? 
        IDisposable subscription = source.Subscribe(
            i =>
            {
                Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
                Thread.Sleep(500);
            },
            exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 1 Completed observation"));

        IDisposable subscription2 = source.Subscribe(
            i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
            exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
            () => Console.WriteLine("Sub 2 Completed observation"));

        Console.WriteLine("press to cancel");
        Console.ReadLine();
        subscription.Dispose();
        subscription2.Dispose();

    }
}
Raghu
  • 1,140
  • 1
  • 14
  • 22

1 Answers1

2

I believe the reason is the selected default IScheduler for the operator. Take a look at the accepted answer here.

For Generate it depends on the overload. Based on the answer, these are the default schedulers used. You can verify them for source if you like

  • Default IScheduler for the temporal operator is DefaultScheduler.Instance
  • Default IScheduler for the latter operator is CurrentThreadScheduler.Instance

You can confirm this by providing a "non-blocking" scheduler in your sync version

IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);

Community
  • 1
  • 1
supertopi
  • 3,469
  • 26
  • 38
  • Looks like if you the overloads taking a timespan pick the `Scheduler.Default` scheduler whereas ones without timespan use a `Scheduler.Immediate` – Raghu Nov 17 '16 at 12:13