3

I'm having troubles testing a class that makes use of Observable.FromAsync<T>() and Observable.Switch<T>(). What it does is to wait for a trigger observable to produce a value, then it starts an async operation, and finally recollects all operations' results in a single output sequence. The gist of it is something like:

var outputStream = triggerStream
  .Select(_ => Observable
    .FromAsync(token => taskProducer.DoSomethingAsync(token)))
  .Switch();

I put up some sanity check tests with the bare minimum parts to understand what's going on, here's the test with results in comments:

class test_with_rx : nspec
{
  void Given_async_task_and_switch()
  {
    Subject<Unit> triggerStream = null;
    TaskCompletionSource<long> taskDriver = null;
    ITestableObserver<long> testObserver = null;
    IDisposable subscription = null;

    before = () =>
    {
      TestScheduler scheduler = new TestScheduler();
      testObserver = scheduler.CreateObserver<long>();
      triggerStream = new Subject<Unit>();
      taskDriver = new TaskCompletionSource<long>();

      // build stream under test
      IObservable<long> streamUnderTest = triggerStream
        .Select(_ => Observable
          .FromAsync(token => taskDriver.Task))
        .Switch();

      /* Also tried with this Switch() overload
      IObservable<long> streamUnderTest = triggerStream
          .Select(_ => taskDriver.Task)
          .Switch(); */

      subscription = streamUnderTest.Subscribe(testObserver);
    };

    context["Before trigger"] = () =>
    {
      it["Should not notify"] = () => testObserver.Messages.Count.Should().Be(0);
      // PASSED
    };

    context["After trigger"] = () =>
    {
      before = () => triggerStream.OnNext(Unit.Default);

      context["When task completes"] = () =>
      {
        long result = -1;

        before = () =>
        {
          taskDriver.SetResult(result);
          //taskDriver.Task.Wait();  // tried with this too
        };

        it["Should notify once"] = () => testObserver.Messages.Count.Should().Be(1);
        // FAILED: expected 1, actual 0

        it["Should notify task result"] = () => testObserver.Messages[0].Value.Value.Should().Be(result);
        // FAILED: of course, index out of bound
      };
    };

    after = () =>
    {
      taskDriver.TrySetCanceled();
      taskDriver.Task.Dispose();
      subscription.Dispose();
    };
  }
}

In other tests I've done with mocks too, I can see that the Func passed to FromAsync is actually invoked (e.g. taskProducer.DoSomethingAsync(token)), but then it looks like nothing more follows, and the output stream doesn't produce the value.

I also tried inserting some Task.Delay(x).Wait(), or some taskDriver.Task.Wait() before hitting expectations, but with no luck.

I read this SO thread and I'm aware of schedulers, but at a first look I thought I didn't need them, no ObserveOn() is being used. Was I wrong? What am I missing? TA

Just for completeness, testing framework is NSpec, assertion library is FluentAssertions.

Community
  • 1
  • 1
superjos
  • 12,189
  • 6
  • 89
  • 134
  • Reading Paul Betts answers (e.g. [here](http://stackoverflow.com/a/3210009/540776)), maybe during test `FromAsync` or `Switch()` under the hood make use of that same idle scheduler/dispatcher provided by MSTest.exe ... – superjos Aug 13 '15 at 09:12

1 Answers1

2

What you're hitting is a case of testing Rx and TPL together. An exhaustive explanation can be found here but I'll try to give advice for your particular code.

Basically your code is working fine, but your test is not. Observable.FromAsync will transform into a ContinueWith on the provided task, which will be executed on the taskpool, hence asynchronously.

Many ways to fix your test: (from ugly to complex)

  1. Sleep after result set (note wait doesn't work because Wait doesn't wait for continuations)

    taskDriver.SetResult(result);
    Thread.Sleep(50);
    
  2. Set the result before executing FromAsync (because FromAsync will return an immediate IObservable if the task is finished, aka will skip ContinueWith)

    taskDriver.SetResult(result);
    triggerStream.OnNext(Unit.Default);
    
  3. Replace FromAsync by a testable alternative, e.g

    public static IObservable<T> ToObservable<T>(Task<T> task, TaskScheduler scheduler)
    {
        if (task.IsCompleted)
        {
            return task.ToObservable();
        }
        else
        {
            AsyncSubject<T> asyncSubject = new AsyncSubject<T>();
            task.ContinueWith(t => task.ToObservable().Subscribe(asyncSubject), scheduler);
            return asyncSubject.AsObservable<T>();
        }
    }
    

(using either a synchronous TaskScheduler, or a testable one)

Community
  • 1
  • 1
Gluck
  • 2,933
  • 16
  • 28
  • Thanks for answering. A first try with option #2 works. I used that by chance in other test cases, didn't thought it was so *crucial*. In other cases where things were not working in the past, I went with *Thread.Sleep()*, but that felt a little bad. Isn't there any overload already available in Rx for *FromAsync()* or *ToObservable()* taking an IScheduler input arg? – superjos Aug 13 '15 at 15:08
  • 1
    E.g. something like [this from Rx Githuib](https://github.com/Reactive-Extensions/Rx.NET/blob/a13e3ff05bdded5cef2bf40bface22f8fa4ae316/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs#L858) – superjos Aug 13 '15 at 15:13
  • Nice finding, didn't know it was added [recently](https://github.com/Reactive-Extensions/Rx.NET/commit/627539549cbf916c0d5b65bea40792affe62143d) ! Should be in 2.3.0 betas I guess. – Gluck Aug 13 '15 at 15:40
  • yes, there's a relatively old [issue](https://github.com/Reactive-Extensions/Rx.NET/issues/21) about that, mentioned in [this answer](http://stackoverflow.com/a/28236216/540776) by James World – superjos Aug 13 '15 at 16:31