0

I have an IObservable<int> sequence that emits a single item the first 9 times it is subscribed, and on further subscriptions it emits nothing and completes immediately:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
    if (++counter < 10)
        return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
    else
        return Observable.Empty<int>();
});

Now I want to repeat this sequence until it is completed. So I used the Repeat operator:

source
    .Repeat()
    .Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
    .Wait();

The problem is that this query never completes. The Repeat keeps subscribing to the source sequence again and again for an eternity. Even worse, when the source has stopped producing elements, the query enters in a merciless tight loop of death that hijacks one core of the CPU (my quad-core machine reports continuous CPU utilization 25%). Here is the output of the above code:

1
2
3
4
5
6
7
8
9

What I want is a variant of the Repeat operator that stops repeating the source when the source has stopped producing elements. Searching through the built-in Rx operators I can see a RepeatWhen operator, but apparently this can be used only for starting faster the next repetition, not for stopping the repeating altogether:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
    this IObservable<TSource> source,
    Func<IObservable<object>, IObservable<TSignal>> handler);

I am not 100% sure though, because the description of the handler parameter is quite obscure, so I might be missing something:

The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.

My question is: how can I implement a RepeatUntilEmpty operator that repeats the source sequence until it's empty? Is it possible to implement it based on the aforementioned RepeatWhen operator? If not, should I go low level (Observable.Create) and reimplement the basic Repeat functionality from scratch? Or can I use the Materialize operator to my advantage, combining it somehow with the existing Repeat? I am out of ideas at the moment. I am willing to accept any kind of solution, either high or low lever.

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    // What to do?
}

Replacing the Repeat with the RepeatUntilEmpty in my original code, should have the effect of making the query complete immediately after emitting the 9 element.

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • I assume you have a real-world situation and the code you've presented is just for the purposes of asking the question. Rx has the `Generate` operator that does what your code is trying to do: `IObservable source = Observable.Generate(1, x => x < 10, x => x + 1, x => x, x => TimeSpan.FromMilliseconds(100.0));`. – Enigmativity Mar 03 '22 at 04:18
  • @Enigmativity honestly what inspired this question is this code: `.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())` that I saw in [this](https://stackoverflow.com/questions/71258726/what-are-publish-and-selectmany-doing-in-this-query) question. Repeatedly taking the first element of a published sequence will eventually result in the source draining out. From that point onwards the published sequence will instantly emit `OnCompleted` notifications on future subscriptions. – Theodor Zoulias Mar 03 '22 at 06:40
  • @I found that line with the `Publish` operator weird. It doesn't seem like `Publish` is actually doing anything. – Enigmativity Mar 03 '22 at 09:42
  • @Enigmativity when I first saw this code I was puzzled too. Publishing a sequence and consuming it piece by piece is a quite unusual and idiomatic way to use the Rx. I am not sure that it's plain wrong though. It might have legitimate practical applications. – Theodor Zoulias Mar 03 '22 at 10:05
  • It only makes sense when publishing to use the parameter variable more than once. Using it once just seems weird. – Enigmativity Mar 03 '22 at 10:18
  • @Enigmativity as far as I understand the published proxy of the source sequence is used multiple times because of the `Repeat`. This operator subscribes again and again to the same sequence ad infinitum. – Theodor Zoulias Mar 03 '22 at 10:22
  • 1
    Right, yes, I see. It's really pushing the time warping of Rx. – Enigmativity Mar 03 '22 at 10:28

1 Answers1

1

You can use indeed Materialize()/Dematerialize() to build your own sequence of notifications based on the received notifications from the Repeat() statement. The notification sequence will look like this:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

So we look for two consecutive OnCompleted notifications. If we don't found one we still return the received OnNext notification, otherwise we return the OnCompleted notification. The code can look like this:

public static void Main(string[] args)
{
    int counter = 0;
    IObservable<int> source = Observable.Defer(() =>
    {
        Console.WriteLine($"counter is now: {counter}");
        if (counter > 20) {
            System.Environment.Exit(1);
        }
        if (++counter < 10)
            return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
        else
            return Observable.Empty<int>();
    });

    source
        .RepeatUntilEmpty()
        .Subscribe(x => {

                System.Threading.Thread.Sleep(10);
                Console.WriteLine($"SUBSCRIBE: {x}");
            }, () => Console.WriteLine("SUBSCRIBE:Completed"));

    System.Threading.Thread.Sleep(10000);
    Console.WriteLine("Main thread terminated");
}

With the RepeatUntilEmpty() method as follow:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    return source
        .Materialize()
        .Repeat()
        .StartWith((Notification<T>)null)
        .Buffer(2, 1)
        .Select(it => {
            Console.WriteLine($"Buffer content: {String.Join(",", it)}");
            if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
                return it[1];
            }
            // it[1] is OnCompleted, check the previous one
            if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
                // not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
                return null;
            }

            // okay, we have two consecutive OnCompleted, stop this observable.
            return it[1];
        })
        .Where(it => it != null) // remove the NULL marker
        .Dematerialize();
}

This will generate the following output:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated

I have not tested how this code handles OnError() notifications, so you might want to check that. Also, I had issues that the source.Materialize().Repeat() part will read some more data from the original source even though it had decided later to stop the observable. Specially with the Do().Wait() statement I sometimes receive additional output like:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

This might be an issue for you as well that the Repeat() part is still trying to read/concat empty observables.

Progman
  • 16,827
  • 6
  • 33
  • 48
  • Thanks Progman for the answer. Your solution is very close to what I want, but it has the problem that it defers the propagation of an item until the next item is produced. This is not apparent in my contrived example, because the single `OnNext()` is followed immediately by an `OnCompleted()`. But in my real code the `source` produces multiple values, and two consecutive values might be quite distant apart in time, and introducing this kind of latency is not really an option. – Theodor Zoulias Feb 27 '22 at 16:46
  • Regarding the `.Materialize().Repeat().Dematerialize()` not being stopped when the `OnCompleted` is propagated, you've probably experienced it when you tested the code without the `.Delay(TimeSpan.FromMilliseconds(100))`. In that case you've recreated this issue: [The Observable.Repeat is unstoppable, is it a bug or a feature?](https://stackoverflow.com/questions/61012408/the-observable-repeat-is-unstoppable-is-it-a-bug-or-a-feature), which can be solved by passing `Scheduler.CurrentThread` as the second argument of the `Observable.Return`. – Theodor Zoulias Feb 27 '22 at 16:47
  • @TheodorZoulias I changed it to read the second value from the buffer and start with a `NULL` value for the materialized stream. That way you don't need to wait for the next value from the source observable, but only react to the second value in the buffer. Also I moved it inside the `RepeatUntilEmpty()` method. – Progman Feb 27 '22 at 22:15
  • Now it's almost perfect, thanks! One issue that I've noticed is that in the corner case where the very first subscription returns an empty sequence, the `RepeatUntilEmpty` enters in a never-ending tight loop. I solved it by adding the `Scheduler.CurrentThread` as the first argument in the `.StartWith((Notification)null)` command. I don't know if this can have any unwanted side-effects, but in my case I am not seeing any. – Theodor Zoulias Feb 27 '22 at 23:45