I have an IObservable<int>
sequence that emits a single item the first 9 times it is subscribed, and on further subscriptions it emits nothing and completes immediately:
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
Now I want to repeat this sequence until it is completed. So I used the Repeat
operator:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
The problem is that this query never completes. The Repeat
keeps subscribing to the source
sequence again and again for an eternity. Even worse, when the source
has stopped producing elements, the query enters in a merciless tight loop of death that hijacks one core of the CPU (my quad-core machine reports continuous CPU utilization 25%). Here is the output of the above code:
1
2
3
4
5
6
7
8
9
What I want is a variant of the Repeat
operator that stops repeating the source
when the source
has stopped producing elements. Searching through the built-in Rx operators I can see a RepeatWhen
operator, but apparently this can be used only for starting faster the next repetition, not for stopping the repeating altogether:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
I am not 100% sure though, because the description of the handler
parameter is quite obscure, so I might be missing something:
The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.
My question is: how can I implement a RepeatUntilEmpty
operator that repeats the source
sequence until it's empty? Is it possible to implement it based on the aforementioned RepeatWhen
operator? If not, should I go low level (Observable.Create
) and reimplement the basic Repeat
functionality from scratch? Or can I use the Materialize
operator to my advantage, combining it somehow with the existing Repeat
? I am out of ideas at the moment. I am willing to accept any kind of solution, either high or low lever.
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
Replacing the Repeat
with the RepeatUntilEmpty
in my original code, should have the effect of making the query complete immediately after emitting the 9
element.