I noticed something strange with the behavior of the Repeat
operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile
operator, and apparently continues running forever. For demonstration I created a source observable that produces a single value, which it is incremented on every subscription. The first subscriber gets the value 1, the second gets the value 2 etc:
int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
await Task.CompletedTask;
//await Task.Yield();
Thread.Sleep(100);
var value = Interlocked.Increment(ref incrementalValue);
o.OnNext(value);
o.OnCompleted();
});
Then I attached the operators Repeat
, TakeWhile
and LastAsync
to this observable, so that the program will wait until the composed observable produces its last value:
incremental.Repeat()
.Do(new CustomObserver("Checkpoint A"))
.TakeWhile(item => item <= 5)
.Do(new CustomObserver("Checkpoint B"))
.LastAsync()
.Do(new CustomObserver("Checkpoint C"))
.Wait();
Console.WriteLine($"Done");
class CustomObserver : IObserver<int>
{
private readonly string _name;
public CustomObserver(string name) => _name = name;
public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}
Here is the output of this program:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...
It never ends! Although the LastAsync
has produced its value and has completed, the Repeat
operator keeps spinning!
This happens only if the source observable notifies its subscribers synchronously. For example after uncommenting the line //await Task.Yield();
, the program behaves as expected:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done
The Repeat
operator stops spinning, although it does not report completion (my guess is that it has been unsubscribed).
Is there any way to achieve consistent behavior from the Repeat
operator, irrespective of the type of notifications it receives (sync or async)?
.NET Core 3.0, C# 8, System.Reactive 4.3.2, Console Application