0

Consider the following example I got from https://msdn.microsoft.com/en-us/library/hh242977(v=vs.103).aspx

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace SimpleSequence {

    class Program {

        static void RunCold() {
            IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));

            IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

            Thread.Sleep(TimeSpan.FromSeconds(5));

            IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

            Console.WriteLine("Press any key to unsubscribe");
            Console.ReadLine();
            subscription1.Dispose();
            subscription2.Dispose();
        }

        static void RunHot() {
            Console.WriteLine("Current Time: " + DateTime.Now);
            var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

            IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

            IDisposable subscription1 = hot.Subscribe(                            // no value is pushed to 1st subscription at this point
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));
            Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);

            Thread.Sleep(5000);                                                   // idle for 5 seconds
            hot.Connect();                                                        // hot is connected to source and starts pushing value to subscribers 

            Console.WriteLine("Current Time after Connect: " + DateTime.Now);

            Thread.Sleep(3000);                                                   //idle for 3 seconds

            Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

            IDisposable subscription2 = hot.Subscribe(                            // value will immediately be pushed to 2nd subscription
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));
            Console.ReadKey();
        }

        static void Main(string[] args) {
            RunCold();
            RunHot();
        }
    }
}

For cold, what thread actually generates the numbers? Is a new thread made for each subscription? The numbers seem independent of each other, so I think each has its own source?

For hot, the same source is shared for each subscriber, so is there one thread generating the numbers, while the main thread waits for input from console?

Christopher Pisz
  • 3,757
  • 4
  • 29
  • 65

0 Answers0