I have the following snippet that enumerates elements of some xml (read from the output of a svn log --xml ...
process) then runs a long-running method for each xml element.
var proc = Process.Start(svnProcInfo);
var xml = XDocument.Load(proc.StandardOutput);
var xElements = xml.Descendants("path")
.ToObservable()
//.SubscribeOn(ThreadPoolScheduler.Instance)
.Select(descendant => return LongRunning(descendant));
xElements
//.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(result => Console.WriteLine(result);
Console.ReadKey();
The LongRunning
method isn't important, but inside it I log the thread it runs on. Let's assume it runs for one whole second.
My problem is, un-commenting either SubscribeOn()
line has no effect whatsoever. The calls to LongRunning
are sequential and happening every one second, on the same thread (although a different thread than the main (initial) thread).
This is a console application.
I'm new at Rx. What am I missing?
EDIT:
After trying Lee Campbell's answer, I noticed another problem.
Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId);
var xElements = xml.Descendants("path").ToObservable()
//.ObserveOn(Scheduler.CurrentThread)
.SelectMany(descendant =>
Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default))
.Subscribe(result => Console.WriteLine(
"Result on: " + Thread.CurrentThread.ManagedThreadId));
[...]
string LongRunning(XElement el)
{
Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId);
DoWork();
Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId);
return "something";
}
This gives the following output:
Main thread 1
Execute on: Thread 3
Execute on: Thread 4
Execute on: Thread 5
Execute on: Thread 6
Execute on: Thread 7
Finished on Thread 5
Finished on Thread 6
Result on: 5
Result on: 6
Finished on Thread 7
Result on: 7
Finished on Thread 3
Result on: 3
Finished on Thread 4
Result on: 4
Done! Press any key...
What I need is a way to "queue" the results to the same thread. I thought that's what ObserveOn()
is for, but un-commenting the ObserveOn()
line above doesn't change the results.