5

I have a class encapsulating Observable.Sample() such as:

class IntervalRequestScheduler
{
    private Subject<Action> _requests = new Subject<Action>();
    private IDisposable _observable;

    public IntervalRequestScheduler(TimeSpan requestLimit)
    {
        _observable = _requests.Sample(requestLimit)
                               .Subscribe(action => action());
    }

    public Task<T> ScheduleRequest<T>(Func<Task<T>> request)
    {
        var tcs = new TaskCompletionSource<T>();
        _requests.OnNext(async () =>
        {
            try
            {
                T result = await request();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        return tcs.Task;
    }
}

How can I test it properly? All my attempts either exit prematurely or cause deadlocks.

abatishchev
  • 98,240
  • 88
  • 296
  • 433
  • Inject an `IScheduler` implemented in your unit test as `TestScheduler`. – Stephen Cleary Oct 04 '14 at 07:53
  • @Stephen: yeah, that was my initial plan but then I thought maybe it's possible to test Sample () rather then IScheduler. But your comments makes me think it's actually incorrect way. Testing is easy, testing the right things is hard :) – abatishchev Oct 05 '14 at 04:50

1 Answers1

7

Controlling Time in Rx

To key to unit testing Rx is understanding how to control time with the TestScheduler. All time-based operators in the Rx libraries take an optional IScheduler parameter in order to let you do this. Your time-based operators should do this as well.

So the first thing we need to do is modify your IntervalRequestScheduler constructor to facilitate this:

public IntervalRequestScheduler(TimeSpan requestLimit,
                                // The scheduler is optional
                                IScheduler scheduler = null)
{
    // assign a default if necessary
    scheduler = scheduler ?? Scheduler.Default;

    // make sure to pass the scheduler in to `Sample`
    _observable = _requests.Sample(requestLimit, scheduler)
                            .Subscribe(action => action());
}

With this change in place, we can now control time!

Here's an example unit test that will call an IntervalRequestScheduler instance's ScheduleRequest method ten times - then advance time by the sample duration of one second and check that only one task has completed:

[Test]
public void ASingleTaskIsCompletedWhenTenAreScheduledWithinInterval()
{
    var scheduler = new TestScheduler();
    var sampleDuration = TimeSpan.FromSeconds(1);

    var intervalRequestScheduler = new IntervalRequestScheduler(sampleDuration,
                                                                scheduler);

    // use a helper method to create "requests"
    var taskFactories = Enumerable.Range(0, 10).Select(CreateRequest);

    // schedule the requests and collect the tasks into an array
    var tasks =
        (from tf in taskFactories
            select intervalRequestScheduler.ScheduleRequest(tf)).ToArray();

    // prove no tasks have completed
    var completedTasksCount = tasks.Count(t => t.IsCompleted);
    Assert.AreEqual(0, completedTasksCount);

    // this is the key - we advance time simulating a sampling period.
    scheduler.AdvanceBy(sampleDuration.Ticks);

    // now we see exactly one task has completed
    completedTasksCount = tasks.Count(t => t.IsCompleted);
    Assert.AreEqual(1, completedTasksCount);
}

// helper to create requests
public Func<Task<int>> CreateRequest(int result)
{
    return () => Task.Run(() => result);
}

Aside

I have until now just focussed on the question at hand - but I did want to add that the actual motivation for IntervalRequestScheduler is a little unclear and the code looks a bit messy. There are possibly better ways to achieve this without mixing wrapped Tasks and IObservables. Staying in the Rx world also makes it easier to make tests predictable by controlling the schedulers involved. In the above code, there is some nastiness I've glossed over because the task invocation is asynchronous and it's possible that the one started task may not actually have completed by the time you test it - so to be absolutely correct you need to get into the messy business of monitoring tasks and giving time for them to start and finish. But hopefully you can see that the TestScheduler avoids all this mess on the Rx side.

If you want to constrain the number of jobs run to a certain rate, why not just sample the input and project the output?

For example - say you hand a request function of type Func<int,int> called runRequest and anIObservable<int> requests input stream providing the inputs each request (could be a Subject<int> for example). Then you could just have:

requests.Sample(TimeSpan.FromSeconds(1), scheduler)
        .Select(input => request(input))
        .Subscribe(result => /* DoSomethingWithResult */);

No idea if this works for your scenario of course, but it may provoke some ideas!

James World
  • 29,019
  • 9
  • 86
  • 120
  • Thank you for your code, it works very well for me! I don't know much about Rx, about testing it I know even less. – abatishchev Oct 05 '14 at 04:52
  • Here's some reasoning/description what this code is intended to do: api client with http client deep inside its dependencies hierarchy makes call to a remote api throttling incoming requests, e.g. no more than 1 per second. Initially code was just making a request and getting error 500 if was made too early. I wanted to add timing transparently to the rest of the code. So http client makes a call being unaware about Rx and so forth. – abatishchev Oct 05 '14 at 04:56
  • 1
    Hmmm, I think I follow. But only the task factories that pass through `Sample` are invoked, but you're returning every TCS task from `ScheduleRequest` - the ones that aren't sampled will never complete or fail - is their a leak there? – James World Oct 05 '14 at 21:15
  • Why any would be skipped but not postponed and process later (after the given period of time)? If it's possible, that's what was my goal to test. – abatishchev Oct 06 '14 at 03:47
  • 1
    `Sample` will drop events by design when more than one appears in it's interval. Take a look at my answer here for an approach to rate-limit without dropping items: http://stackoverflow.com/questions/21588625/process-rx-events-at-fixed-or-minimum-intervals/21589238#21589238 – James World Oct 06 '14 at 07:37
  • What is the difference with Throttle() then? I tested my code from a console app sending 2 request per 0.5 sec and the server processing 1 request per 1 sec, nothing was dropped. Maybe need to revisit that test – abatishchev Oct 06 '14 at 16:13
  • Throttle suppresses events until no subsequent event arrives in the stated interval. i.e., you have to have a gap in the stream of the stated interval until an event is emitted - and that event will have been delayed by the time interval. Sample picks the first event from each "window" of the time interval duration. – James World Oct 06 '14 at 16:17
  • In the approach you linked, will it start processing a request if it was the first in a row, or will wait 1 sec mandatory? – abatishchev Oct 06 '14 at 16:18
  • There is no delay, i.e. an event is only held up if there hasn't been a gap of at least the interval since the preceding event. – James World Oct 06 '14 at 16:18
  • James, can you please take a look on this sample of Sample() http://pastebin.com/2jEjpaeV it doesn't seem to drop requests but process them in delayed fashion. What am I missing? – abatishchev Oct 06 '14 at 17:03
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/62539/discussion-between-james-world-and-abatishchev). – James World Oct 06 '14 at 17:25
  • You know, I just realized that my code based on your answer doesn't work. https://gist.github.com/abatishchev/7f5b34e56973dccfe604 The problem is that always only 1 task has been ran, not more. – abatishchev Sep 03 '15 at 05:35
  • When you call `Delay` in your `DelayTaskScheduler` (scrolled off the to the right somewhat in your gist), you are forgetting to pass it the `scheduler`. Do that and it will work: `.Delay(settings.RequestDelay, scheduler)`. All time-based operators will need to run on the test scheduler. – James World Sep 03 '15 at 08:13