2

I'm having trouble testing reactive code that's consuming a Task based service. In my class under test I consume the task and use ToObservable to do reactive-y things with it.

public void Method()
{
  _svc.MyTaskServiceMethod().ToObservable().Select(....) //pipe it elsewhere and do interesting things.
}

Now in a unit test I'm testing some timing (using Moq for the service)

svcMock.Setup(x => x.MyTaskServiceMethod()).Returns(() =>
  Observable.Return("VALUE", testScheduler)
    .Delay(TimeSpan.FromMilliseconds(100), testScheduler)
    .ToTask()
  );

The problem is that despite using the test scheduler in the Return/Delay calls, the task itself is still completing on a separate thread. I'm seeing this by adding a couple of Console writes of the current managed thread id to the code.

svcMock.Setup(x => x.MyServiceMethod()).Returns(() =>
{
  var task = Observable.Return("VALUE", testScheduler)
   .Delay(TimeSpan.FromMilliseconds(1000), testScheduler)
   .Do(x => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Obs"); })
   .ToTask();

   task.ContinueWith((_) =>
   {
       Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Task");
   });
   return task;
});

The Do(..) executes on the primary testing thread, and happens exactly when I expect after a testSchduler.AdvanceBy(..) call.

The task continuation is still happening in a separate thread and basically doesn't execute until after the body of the unit test has finished. So in the body of my target, nothing ever really gets pushed through my task.ToObservable() observable.

Clyde
  • 8,017
  • 11
  • 56
  • 87

2 Answers2

3

Task continuations will use a task pool thread by default, so your continuation escapes the control of the test scheduler. If you specify the option TaskContinuationOptions.ExecuteSynchronously, it will use the same thread and the result will be posted to the observable as desired:

task.ContinueWith((_) =>
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId.ToString() + " Task");
}, TaskContinuationOptions.ExecuteSynchronously);

Addendum

You may find this related discussion on the Rx site quite illuminating on the subject of concurrency in TPL -> Rx transitions, and ToObservable() in particular.

James World
  • 29,019
  • 9
  • 86
  • 120
  • That would be fine for my demo code of the thread ID, but the continuation I actually care about is embedded in the ToObservable() extension method. I'm pretty hesitant to roll my own version of that library method just for this one testing scenario – Clyde Jan 23 '15 at 13:26
  • Just to be clear, I've tested this idea out now and it does give the results I'm looking for. I copied the ToObservable extensions out of the reactive extensions source code and altered them to use this continuation option. – Clyde Jan 23 '15 at 14:27
  • I'm wondering if there's a way to force a task to run its continuations synchronously -- basically the opposite of this question: http://stackoverflow.com/questions/22579206 – Clyde Jan 23 '15 at 14:28
  • I'm not aware of a general clean/easy way to do this. Custom TaskSchedulers or specifying task options are obviously "noise" you don't want and often aren't convenient. Moving between TPL and Rx often has ugly details. Not always an option I know, but some projects I've worked on a decision has been taken to use Rx everywhere by preference - even for single result operations - precisely to avoid these kinds of warts and enhance testability.... – James World Jan 23 '15 at 15:05
  • ...I'm not sure I necessarily agree with this approach all that often given the prevalence of TPL code in the BCL and 3rd party libraries, and the fact that TPL is more widely understood by coders - but it can be an option. – James World Jan 23 '15 at 15:09
  • Also see the addendum above! – James World Jan 23 '15 at 15:17
  • thanks for the link to that discussion -- means I don't have to finish writing out the forum post I was working on to raise just that issue – Clyde Jan 23 '15 at 15:36
  • See the copy at https://github.com/Reactive-Extensions/Rx.NET/issues/21 - @davesexton has done some work on this in a fork too. – James World Jan 23 '15 at 16:03
1

Some time ago I co-authored a unit test library based on NUnit to help with precisely with Rx and TPL testing. For that we built a Test TPL scheduler to force all TPL tasks to run without concurrency. You can see the relevant code here: https://github.com/Testeroids/Testeroids/blob/master/solution/src/app/Testeroids/TplTestPlatformHelper.cs#L87

Pedro Pombeiro
  • 1,654
  • 13
  • 14