I read the following snipped from Ollie Riches' blog post Trying to be more functional with Rx and became to wonder the same too as the author: why is not OnCompleted passed? Could someone tell what is happening here? Perhaps something embarrassingly simple?
The code is a bit bit modified and reproduced here for convenience (and my apologies to Ollie if it wasn't acceptable to rip his code here):
public static class RxExtensions
{
public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
{
return Observable.Create<T>(o =>
{
var disposable = suspend.StartWith(initialState)
.DistinctUntilChanged()
.Select(s => s ? Observable.Empty<T>() : stream)
.Switch()
.Subscribe(o);
return disposable;
});
}
}
var testScheduler = new TestScheduler();
var generatorCount = 10;
//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
x => x <= generatorCount,
x => x + 1,
x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
x => TimeSpan.FromSeconds(1),
testScheduler);
Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);
Console.WriteLine(exception);
Console.WriteLine(completed);
For the record, I was thinking to try to produce a stream that can be paused and stopped with the distinction that paused stream accumulates events, suspeneded just skips them. It started to look a bit more involved than I expected to, especially if one thinks to put a limit or "save strategy" to the paused bit. Oh well...
<edit: Interestingly, I just noticed a RxJS implementation of Pausable.