1

I have been investigating the possibility of 'protecting' an observable source from a slow observer, by throwing away any new items that come in while the observer is working.

After looking at ObserveLatestOn (and finding it a bit hard to understand...), I chanced on this answer, which suggested doing it in Subscribe, rather than inside the Observable monad.

Which gives something like this (versions with delegate overloads not shown):

public static IDisposable NonBlockingSubscribe<T>(this IObservable<T> observable, 
                                                       IObserver<T> observer)
{
    Task task = null;
    return observable.Subscribe(value =>
    {
        if(task == null || task.IsCompleted)
            task = Task.Run(() => observer.OnNext(value));
    }, observer.OnError, observer.OnCompleted);
}

The challenge now is working out how to write a test for this using ReactiveTest and virtual time.

I've seen the answer to this question, which involves using TestScheduler to generate a 'ticker' which can be used in turn to create an AsyncSelector, but I'm a bit stuck working out how to go from there to something which can 'wait for a given number of ticks'.

Community
  • 1
  • 1
Benjol
  • 63,995
  • 54
  • 186
  • 268
  • What is the question? Also, you could call `Task.Run(...)` and then call `observer.OnCompleted()` and then have `observer.OnNext(value)` happen because of the `Task.Run(...)`, in that order. That's a problem. – Timothy Shields Oct 26 '15 at 17:15
  • @TimothyShields, the question is how I can test this. Good point about the OnCompleted/OnNext race condition, in my particular use case I'm not expecting to ever get an OnCompleted, but it's worth thinking about anyway. I guess I would need to lock `task` and then maybe use ContinueWith. – Benjol Oct 27 '15 at 06:05
  • Some marble diagrams might be helpful to illustrate the various cases that you want to test for. It's not clear what you're attempting to do. – FMM Oct 29 '15 at 13:58

0 Answers0