2

I'm having trouble with the way ReactiveCommand deals with ObserveOn and SubscribeOn.

I have an API that returns an observable sequence of strings, and it looks like this :

 public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
 {
     return Observable.Create<string>(obs =>
     {
         for (int i = 0; i < numParagraphs; i++)
         {
             Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
             Thread.Sleep(1000);
             obs.OnNext("Some String");
         }
         obs.OnCompleted();
         return Disposable.Empty;
     });
 }

I'm using ReactiveCommand.CreateAsyncObservable to invoke this, using SubscribeOn(RxApp.TaskpoolScheduler) (to ensure the Thread.Sleep doesn't happen on the UI thread), and ObserveOn(RxApp.MainThreadScheduler) to draw strings on my UI thread.

Unfortunately, this all executed synchronously (on the same thread), and I'm not sure why. This is was the VM code looks like :

DownloadDocument = ReactiveCommand
.CreateAsyncObservable(_ =>
{
    Console.WriteLine("ViewModel Invoking On thread {0}", Thread.CurrentThread.ManagedThreadId);
    return _documentService.GetDocumentObservable(NumParagraphs, 0);
});

DownloadDocument
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(p =>
    {
        Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
        Document.Add(p);
    },
    x => { },
    () => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });

Eveything executes on the same thread, and blocks the UI thread. If I invoke it "the old fashioned way", everything works as expected (as below):

Something = ReactiveCommand.Create();
Something.Subscribe(x =>
{
    _documentService.GetDocumentObservable(NumParagraphs, 0)
    .SubscribeOn(RxApp.TaskpoolScheduler)
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(p =>
    {
        Console.WriteLine("ViewModel OnNext thread {0}", Thread.CurrentThread.ManagedThreadId);
        Document.Add(p);
    },
    ex => { },
    () => { Console.WriteLine("ViewModel OnComplete thread {0}", Thread.CurrentThread.ManagedThreadId); });
});

No blocked threads here.

Is there something I'm missing when it comes to using ReactiveCommands with Observable apis ?

enomam
  • 89
  • 5
  • I notice that this is somewhat similar to this post : http://stackoverflow.com/questions/25223340/reactiveui-6-async-command-not-running-on-background-thread-in-wpf-app . A workaround was put in place there (by explicity calling the code on a different thread), rather than having a bug raised in ReactiveUI. – enomam Mar 06 '15 at 06:16

2 Answers2

2

ReactiveCommand itself subscribes to your source when the command is invoked, which doesn't pick up your SubscribeOn. The easiest way to fix this is to just wrap your code in Task.Run:

 return Observable.Create<string>(obs =>
 {
     bool stopEarly;

     Task.Run(() => {
         for (int i = 0; i < numParagraphs; i++)
         {
             Console.WriteLine("Service On thread {0}", Thread.CurrentThread.ManagedThreadId);
             Thread.Sleep(1000);
             obs.OnNext("Some String");
             if (stopEarly) return;
         }
         obs.OnCompleted();
     });

     return Disposable.Create(() => stopEarly = true);
 });
Ana Betts
  • 73,868
  • 16
  • 141
  • 209
  • Adding the Task.Run is something that I wanted to avoid, given that SubscribeOn (in a regular Rx scenario) would have given me that anyway. Also, I should have been clearer - the "API" I provided was just for experimentation. The goal was to look at ways of consuming a synchronous API in an observable manner. – enomam Mar 07 '15 at 08:22
  • Basically, I don't want the API to decide what thread it runs on, I want the consumer to decide that. – enomam Mar 07 '15 at 08:29
  • 1
    @enomam Then you need to use either `Observable.Start` or `Task.Run` – Ana Betts Mar 08 '15 at 03:48
  • That works, but isn't really sitting right with me. Observable.Start doesn't give me the IObserver interface (meaning I can't trickle data to the consumer), and Task.Run will still be wrapped with an Observable. Maybe I'll need to split my api consumption into two separate domains - those that deal with single request - multiple results over time, and the more basic request/response type stuff (which can be wrapped in a ReactiveCommand nicely). – enomam Mar 10 '15 at 01:16
  • ... also (sorry to keep bugging you !), the Disposable.Create lamba doesn't get invoked until the task completes (after the obs.OnComplete) DownloadDocument.Subscribe(...) gives me a disposable - but disposing of that one doesn't immediately dispose. – enomam Mar 10 '15 at 06:31
  • @enomam Correct. You could make an argument that if nobody's listening to the command we should kill all in-flight tasks, but this would also be Surprising to people listening to the ExecuteAsync result – Ana Betts Mar 10 '15 at 21:06
0

Your GetDocumentObservable method doesn't provide Rx to run the code on another thread so immediately during subscribe it will run all the values and signal complete before the call to .Subscribe(...) has finished.

Some key things to be wary of when writing code like this is the use of return Disposable.Empty; and Thread.Sleep(...);. They should be red flags for you.

Instead, you should always try to implement your methods using the other in-built methods first, and only move on to the Create "roll-your-own" when you have to.

Luckily there's a very powerful built-in operator that meets your needs for this perfectly - Generate. This method is very useful for generating sequences that incorporate a "sleep".

Here's what it would look like:

public IObservable<string> GetDocumentObservable(int numParagraphs, int latency)
{
    return
        Observable
            .Generate(
                0,
                i => i < numParagraphs,
                i => i + 1,
                i => "Some String",
                i => i == 0
                    ? TimeSpan.Zero
                    : TimeSpan.FromSeconds(1.0))
            .Do(x => Console.WriteLine(
                "Service On thread {0}",
                Thread.CurrentThread.ManagedThreadId));
}

This does everything that your method does and should behave as you wanted it to.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thanks for that. I should be clear - the Thread.Sleep and the Disposable stuff, that's for experimentation to simulate something that takes a long time, which may, or may not trickle results to the consumer. The end goal is to consume a synchronous API in a ReactiveManner - so it's not really a generator (though in some cases it could be) – enomam Mar 07 '15 at 08:26