12

I have a service that allows a caller to send commands and receive responses asynchronously. In a real application, these actions are fairly disconnected (some action will send a command, and the responses will be process independently).

However, in my tests, I need to be able to send a command and then wait for the (first) response before continuing the test.

The responses are published using RX, and my first attempt at the code was something like this:

service.SendCommand("BLAH");
await service.Responses.FirstAsync();

The problem with this, is that FirstAsync will only work if the response arrives after this await has already been hit. If the service processes very quickly, then the test will hang on the await.

My next attempt to fix this was to call FirstAsync() prior to sending the command, so that it would have the result even if it arrived before awaiting:

var firstResponse = service.Responses.FirstAsync();
service.SendCommand("BLAH");
await firstResponse;

However, this still fails in the same way. It seems like it's only when the await is hit (GetAwaiter is called) that it starts listening; so the exact same race condition exists.

If I change my Subject to a ReplaySubject with a buffer (or timer) then I can "workaround" this; however it doesn't make sense to do that in my production classes; it would only be for testing.

What's the "correct" way to be able to do this in RX? How can I set up something that will receive the first event on a stream in a way that won't introduce a race condition?

Here's a small test that illustrates the issue in a "single-threaded" way. This test will hang indefintely:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    // Subscribe to the first bool (but don't await it yet)
    var firstBool = x.FirstAsync();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool; // <-- hangs here; presumably because firstBool didn't start monitoring until GetAwaiter was called?


    Assert.Equal(true, b);
}

I even tried calling Replay() in my test thinking it would buffer the results; but that doesn't change anything:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.Replay();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool.FirstAsync(); // <-- Still hangs here


    Assert.Equal(true, b);
}
Danny Tuppeny
  • 40,147
  • 24
  • 151
  • 275

3 Answers3

19

You can do this with an AsyncSubject

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.FirstAsync().PublishLast(); // PublishLast wraps an AsyncSubject
    firstBool.Connect();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool;


    Assert.Equal(true, b);
}

AsyncSubject basically caches the last received value before OnComplete is called and then replays it.

Flagbug
  • 2,093
  • 3
  • 24
  • 47
  • Great! I'm slightly confused by the naming of this; but it does indeed seem to do what I want; returning the value whether or not it's published before the `await`. I'm not sure why `.Replay(1)` didn't work though; since my understanding of that is the same? – Danny Tuppeny Jun 27 '14 at 18:29
  • Maybe you didn't call `Connect` on the `IConnectableObservable` that `Replay()` returns? – Flagbug Jun 27 '14 at 18:31
  • Yes; you're right! So are `Replay(1)` and `PublishLast()` basically the same? Is there nothing that does the same and automatically connects? Seems strange to have to perform this extra step; my tests are becoming more full of boilerplate cruft :( – Danny Tuppeny Jun 27 '14 at 18:33
  • They're similar but not exactly the same. `PublishLast` will only return a value at the `await`, once the sequence completes (here we use `FirstAsync` before the `PublishLast`, so it does that immediately. `Replay(1)` returns, as soon as it has the first value. – Flagbug Jun 27 '14 at 18:45
  • Rx semantics are a bit hard to describe in a comment, I hope you get a rough idea what I mean. – Flagbug Jun 27 '14 at 18:45
  • I think so! So `FirstAsync().PublishLast()` has the same affect as `Replay(1)` because FirstAsync completes the stream as soon as it gets an item; but normally `PublishLast()` wouldn't yield a value until `OnComplete()` was called; whereas `Replay(x)` would just immediately give back the last x values when awaited? – Danny Tuppeny Jun 27 '14 at 18:47
  • Also, `Replay(1)` will stay subscribed until either the whole sequence completes, or you manually unsubscribe. `FirstAsync().PublishLast()` will do that automatically. – Flagbug Jun 27 '14 at 18:52
  • Right, makes sense :) The only remaining quirk, is when I want to this twice, I have to create two of these things, there's no way to "reset" the first one? – Danny Tuppeny Jun 27 '14 at 19:04
  • How to await for specific time only? – John Demetriou Sep 27 '16 at 12:21
  • @JohnDemetriou Add `.Timeout(TimeSpan.FromSeconds(30));` – Flagbug Oct 27 '16 at 10:58
9

Great question Danny. This troubles lots of people new to Rx.

FlagBug has an acceptable answer above, but it could have been even easier to have just added the single line

var firstBool = x.Replay();
firstBool.Connect();   //Add this line, else your IConnectableObservable will never connect!

This style of testing is ok. But there is another way, which in my experience is what people move to once they have used Rx for a bit longer. I suggest you just go straight to this version! But lets get there slowly...

(please excuse the switch back to NUnit as I don't have an xUnit runner on this PC)

Here we simply just add values to a List<T> as the are produced. We then can just check the contents of the list in our asserts:

[Test]
public void MyTest_with_List()
{
    var messages = new List<bool>();
    var x = new Subject<bool>();

    x.Subscribe(messages.Add);

    // Send the first bool
    x.OnNext(true);

    Assert.AreEqual(true, messages.Single());
}

For these super simple tests, that is Ok, but we miss some fidelity around sequence termination i.e. did it complete or error?

We can further extend this style of testing by using the testing tools for Rx (Rx-testing Nuget). In this test we use the MockObserver/ITestableObserver<T> that we (annoyingly) get from a TestScheduler instance. Note I have made the test fixture/class extend ReactiveTest

[TestCase(true)]
[TestCase(false)]
public void MyTest_with_TestObservers(bool expected)
{
    var observer = new TestScheduler().CreateObserver<bool>();
    var x = new Subject<bool>();

    x.Subscribe(observer);

    x.OnNext(expected);

    observer.Messages.AssertEqual(
        OnNext(0, expected));
}

This may seem like a small improvement, or even arguably a step backwards with the need for creating test schedulers, and specifying expected times we see messages. However as soon as you start introducing more complex Rx tests, this becomes very valuable.

You could further extend the test to even generate your source sequence upfront and specify when the values will be played in virtual time. Here we drop the usage of the subject and specify that in 1000ticks we will publish a value (expected). In the assertion, we again check the value and also the time the value was received. As we are now introducing Virtual time we also need to say when we want time to advance. We do that here by calling testScheduler.Start();

[TestCase(true)]
[TestCase(false)]
public void MyTest_with_TestObservables(bool expected)
{
    var testScheduler = new TestScheduler();
    var observer = testScheduler.CreateObserver<bool>();
    var source = testScheduler.CreateColdObservable(
        OnNext(1000, expected));

    source.Subscribe(observer);
    testScheduler.Start();

    observer.Messages.AssertEqual(
        OnNext(1000, expected));
}

I have written more about testing Rx at here

Lee Campbell
  • 10,631
  • 1
  • 34
  • 29
  • Thanks for the additional info! The test I posted was actually a hacked up sample showing the problem; but in reality my tests aren't actually calling the OnNext methods; so having something to `await` is needed. – Danny Tuppeny Jun 27 '14 at 19:40
  • Though I'd love to have your opinion on my real tests to see if they can be cleaned up (I don't think they're particularly readable!) https://github.com/DanTup/DartAnalysis.NET/blob/master/DanTup.DartAnalysis.Tests/AnalysisTests.cs – Danny Tuppeny Jun 27 '14 at 19:41
  • Specifically, TestAnalysisUpdateContent has some wonkiness because it needs to wait for analysis to complete twice throughout the test :( – Danny Tuppeny Jun 27 '14 at 19:41
  • Hi Dan, had a quick look as as you don't seem to use IoC and have hard coded paths e.g. M:\Apps\Dart\sdk, I cant run your tests. It seems there is plenty to clean up. Use of subjects, lack of OnError strategy, confusion on the IDisposable pattern, the old Dictionary+Tasks to Observable pattern, mixing callbacks and observable sequences... plenty of opportunity for success :-) – Lee Campbell Jun 28 '14 at 08:44
  • http://stackoverflow.com/questions/23593225/ is a quesstion that might help with your design of your AnalysysServiceWrapper type. – Lee Campbell Jun 28 '14 at 08:45
  • Sorry; didn't expect you to run the tests; there is indeed a lot of setup to making them work currently; I don't fully understand the functionality of the service I'm wrapping so these tests are partly to help explore! I'll have a look through the link when next working on this; thanks! – Danny Tuppeny Jun 28 '14 at 09:55
0

We had the same issue than you, and we solved it by transforming the Observable into a Task. This is the most sensible way i think as using a Task you are sure to not miss the result if it completed before you awaited for it, and your code will also wait for the result if the task didn't complete yet.

var boolSubject = new Subject<bool>();

//Create a Task that will start running immediately to catch the first element
var myTask = boolSubject.FirstAsync().ToTask(); 

// Send the first bool
boolSubject.OnNext(true);

//wait for the task to complete, or retrieve the result if it did complete already
var b = await myTask; 

Assert.Equal(true, b);
Gregfr
  • 626
  • 5
  • 14