0

I have the following snippet that enumerates elements of some xml (read from the output of a svn log --xml ... process) then runs a long-running method for each xml element.

var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

var xElements = xml.Descendants("path")
                   .ToObservable()
                   //.SubscribeOn(ThreadPoolScheduler.Instance) 
                   .Select(descendant => return LongRunning(descendant));
xElements
    //.SubscribeOn(NewThreadScheduler.Default)
    .Subscribe(result => Console.WriteLine(result);

Console.ReadKey();

The LongRunning method isn't important, but inside it I log the thread it runs on. Let's assume it runs for one whole second.

My problem is, un-commenting either SubscribeOn() line has no effect whatsoever. The calls to LongRunning are sequential and happening every one second, on the same thread (although a different thread than the main (initial) thread).

This is a console application.

I'm new at Rx. What am I missing?

EDIT:

After trying Lee Campbell's answer, I noticed another problem.

Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);

var xElements = xml.Descendants("path").ToObservable()
    //.ObserveOn(Scheduler.CurrentThread)
    .SelectMany(descendant =>     
          Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
    .Subscribe(result => Console.WriteLine(
         "Result on: " + Thread.CurrentThread.ManagedThreadId));

[...]

string LongRunning(XElement el)
{
    Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
    DoWork();
    Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
    return "something";
}

This gives the following output:

Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...

What I need is a way to "queue" the results to the same thread. I thought that's what ObserveOn() is for, but un-commenting the ObserveOn() line above doesn't change the results.

Cristian Diaconescu
  • 34,633
  • 32
  • 143
  • 233
  • 1
    When I see people use `SubscribeOn`, they usually want `ObserveOn` instead. SubscribeOn only affects where the actual act of subscribing runs. To change where the OnNext (which is where the selector for Select runs), you need ObserveOn. Alone, that may not be a complete or good solution . – Gideon Engelberth Sep 26 '13 at 03:56
  • Try taking a look at this answer http://stackoverflow.com/questions/17772373/newthreadscheduler-default-schedules-all-work-on-same-thread/17775727#17775727 It should show you the problem you're having with logging the `ThreadId`. – Bryan Anderson Sep 26 '13 at 19:14

1 Answers1

8

Firstly, Rx is a library (or paradigm) for controlling asynchrony, specifically observable sequences. What you have here is a enumerable sequence (the Xml Descendants) and a blocking/synchronous LongRunning method call.

By calling ToObservable() on your enumerable sequence, you are really only complying with the interface, but as your sequence is realized (eager not lazy), there is nothing really Observable/Async about it.

By calling SubscribeOn, you had the right idea, but the conversion has been done already in the ToObservable() operator. You probably meant to call ToObservable(ThreadPoolScheduler.Instance) so that any slow iteration of the IEnumerable can be done on the other thread. However...I think this will not be a slow iterator, so this probably doesn't solve anything.

What you most likely want to do (which is dubious if Rx is best tool for this type of problem) is to schedule the call to LongRunning method. However this means you will need to add Asyncrony to your select. A great way to do this is one of the Rx Factory methods like Observable.FromAsync or Observable.Start. This will however make your sequence an IObservable<IObservable<T>>. You can flatten it by using SelectMany or Merge.

Having said all this, what I think you want to do is:

var proc = Process.Start(avnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);

//EDIT: Added ELS to serialise results onto a single thread.
var els = new EventLoopScheduler(threadStart=>new Thread(threadStart)
    {
        IsBackground=true, 
        Name="MyEventLoopSchedulerThread"
    });

var xElements = xml.Descendants("path").ToObservable()
                .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),ThreadPoolScheduler.Instance))
                .ObserveOn(els)
                .Subscribe(result => Console.WriteLine(result));

Console.ReadKey();
Cristian Diaconescu
  • 34,633
  • 32
  • 143
  • 233
Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • I think it might be a little easier/clearer to use Observable.ToAsync rather than Observable.Start. Other than that this is the perfect answer. – Bryan Anderson Sep 26 '13 at 00:07
  • @Lee The enumeration of the, well, `IEnumerable` is indeed fast - faster than it takes to process each element, anyway. No need to hike it to a different thread (at least for this console app). – Cristian Diaconescu Sep 26 '13 at 08:34
  • Also, I'm wondering myself if Rx *is* the tool for the job. It looks kind of forced. Maybe TPL/Async/await is a better fit. I'm struggling with both Rx and TPL, so this will be a good exercise. – Cristian Diaconescu Sep 26 '13 at 08:35
  • @Lee One more thing - I don't really agree that the enumerable sequence is realized. THe input is read from an ongoing stream (that's network-bound, so pauses are plausible), and afaik `XDocument.Load()` doesn't do caching - it's a forward xml parser. Btw +1 for your answer :) – Cristian Diaconescu Sep 26 '13 at 08:43
  • @Lee Could you please take a look at the updated question? PS. I (finally) figured out why your name sounded so familiar. +oo for writing 'Intro To Rx'! – Cristian Diaconescu Sep 26 '13 at 10:01
  • 1
    The `Scheduler.CurrentThread` is a bit misleading. It wont magically return the execution to the thread that originally made the subscription. If you stop think about it, how could it? What you will want is the `EventLoopScheduler`. Create one of these and it will now dedicate a thread to the scheduler. Now you can serialize all your processed values to your subscription. Will update the answer... – Lee Campbell Sep 26 '13 at 19:40
  • 1
    @Bryan What's the benefit of `Observable.ToAsync` over `Observable.Start`? Cause readability isn't one (or I'm doing it wrong): `Observable.Start(() => LongRunning(descendant, revId, svnRoot), NewThreadScheduler.Default))` vs `Observable.ToAsync(LongRunning, NewThreadScheduler.Default)(descendant, revId, svnRoot))`. This is the actual signature for 'LongRunning(...)'. – Cristian Diaconescu Sep 27 '13 at 11:50
  • @CristiDiaconescu Mostly based on using Rx around people who haven't used it much. They tend to understand that ToAsync causes a synchronous function to run asynchronously while Start tends to confuse them, they think it's running the function normally and returning an Observable of the result. – Bryan Anderson Sep 28 '13 at 01:27