0

How do I block the current thread until the OnComplete handler of my observer has finished, without the use of threading primitives?

Here is my code. I want that the Console.WriteLine("Press... statement should be executed only after the OnComplete handler, namely ResetCount has finished executing.

class Program
{
    private static long totalItemCount = 0;
    private static long listCount = 0;

    static void Main()
    {
        Console.WriteLine($"Starting Main on Thread {Thread.CurrentThread.ManagedThreadId}\n");

        var o = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1))
            .Take(20)
            .Concat(Observable.Interval(TimeSpan.FromSeconds(0.01)).Take(200))
            .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));

        o.Subscribe(Print, onCompleted: ResetCount);


        // How I make sure this line appears only after the OnComplete has fired?
        // Do I have to use traditional threading primitives such as wait handles?
        // Or just cause the main thread to sleep long enough? That doesn't seem right.
        Console.WriteLine("\nPress any key to exit...");
        Console.ReadKey();
    }

    private static void ResetCount()
    {
        if (listCount > 0)
        {
            Console.WriteLine($"{totalItemCount} items processed in {listCount} lists.");
        }
        else
        {
            Console.WriteLine($"{totalItemCount} items processed.");
        }

        Interlocked.Exchange(ref totalItemCount, 0);
        Interlocked.Exchange(ref listCount, 0);
    }

    static void Print<T>(T value)
    {
        var threadType = Thread.CurrentThread.IsBackground ? "Background" : "Foreground";

        if (value is IList)
        {
            var list = value as IList;
            Console.WriteLine($"{list.Count} items in list #{Interlocked.Increment(ref listCount)}:");

            foreach (var item in list)
            {
                Console.WriteLine($"{item.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
            }
            Console.WriteLine();
        }
        else
        {
            Console.WriteLine($"{value.ToString()}, ({threadType} #{Thread.CurrentThread.ManagedThreadId}), Item #{Interlocked.Increment(ref totalItemCount)}");
        }
    }
}
Water Cooler v2
  • 32,724
  • 54
  • 166
  • 336
  • Why don't you move this statement to the end of `ResetCount()` method? – pmbanka Jun 14 '16 at 11:08
  • Yes, I could remove *this* statement in *this particular case* to the end of the `ResetCount` method, but that would be a cop-out and more than this particular example, I would like to know how to solve the class of problems I am trying to highlight with this example. – Water Cooler v2 Jun 14 '16 at 23:27

1 Answers1

1

On Rx we have specific schedulers to handle threading, synchronization and related.

You can read more about that here: http://www.introtorx.com/content/v1.0.10621.0/15_SchedulingAndThreading.html

But basically what you're looking for is changing this line:

 .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), Scheduler.CurrentThread);

They're several ways to test/validate a Rx query. Keep in mind that wouldn't be the answer for all the problems.

J. Lennon
  • 3,311
  • 4
  • 33
  • 64
  • Yes, thank you. I was aware of schedulers but I must have been awake too long to realize that I used the default scheduler, I am now confused. The default scheduler in the case of a console app would be the `CurrentThreadScheduler`, right? Or would it be the `ThreadPoolScheduler`? From the results this program produced without specifying a scheduler, it looked like it was the `ThreadPoolScheduler` but if I changed the program to produce something less complicated such as `Observable.Range(...)`, it used the `CurrentThreadScheduler` as the default. Why is that? – Water Cooler v2 Jun 14 '16 at 23:43
  • I think I understand. If we do not specify a scheduler, it selects the *best* scheduler for the platform *and the operation* as the default scheduler. So the call to `IObservable.Timer` would have used a thread pool scheduler to run the `System.Threading.Timer` on. `Interval` might have done the same. – Water Cooler v2 Jun 14 '16 at 23:51
  • AFAIK each operator has a standard/default scheduler in case that the IScheduler is not being provided. So it really dependents of the operator that you're using -http://stackoverflow.com/questions/15341864/what-are-the-default-schedulers-for-each-observable-operator Another code that you would be useful for testing purposes is `.Wait()`. More info: http://stackoverflow.com/questions/37801699/what-does-the-wait-operator-in-rx-net-do – J. Lennon Jun 14 '16 at 23:59
  • Interesting how one could use `Wait` to wait for completion of an observable. Nice. Thank you. – Water Cooler v2 Jun 15 '16 at 00:02