0

I am using the following code from here - looks like an issue to me in clearing the "Replay cache"

https://gist.github.com/leeoades/4115023

If I change the following call and code like this I see that there is bug in Replay i.e. it is never cleared. Can someone please help to rectify this ?

private Cache<string> GetCalculator()
    {
        var calculation = Observable.Create<string>(o =>
        {
            _calculationStartedCount++;

            return Observable.Timer(_calculationDuration, _testScheduler)
                             .Select(_ => "Hello World!" + _calculationStartedCount) // suffixed the string with count to test the behaviour of Replay clearing
                             .Subscribe(o);
        });

        return new Cache<string>(calculation);
    }

[Test]
    public void After_Calling_GetResult_Calling_ClearResult_and_GetResult_should_perform_calculation_again()
    {
        // ARRANGE
        var calculator = GetCalculator();

        calculator.GetValue().Subscribe();
        _testScheduler.Start();

        // ACT
        calculator.Clear();

        string result = null;
        calculator.GetValue().Subscribe(r => result = r);
        _testScheduler.Start();

        // ASSERT
        Assert.That(_calculationStartedCount, Is.EqualTo(2));
        Assert.That(result, Is.EqualTo("Hello World!2")); // always returns Hello World!1 and not Hello World!2
        Assert.IsNotNull(result);
    }
Angshuman Agarwal
  • 4,796
  • 7
  • 41
  • 89

1 Answers1

2

The problem is a subtle one. The source sequence Timer completes after it emits an event, which in turn calls OnCompleted on the internal ReplaySubject created by Replay. When a Subject completes it no longer accepts any new values even if a new Observable shows up.

When you resubscribe to the underlying Observable it executes again, but isn't able to restart the Subject, so your new Observer can only receive the most recent value before the ReplaySubject completed.

The simplest solution would probably just be to never let the source stream complete (untested):

    public Cache(IObservable<T> source)
    {
        //Not sure why you are wrapping this in an Observable.create
        _source = source.Concat(Observable.Never())
                            .Replay(1, Scheduler.Immediate);
    }
paulpdaniels
  • 18,395
  • 2
  • 51
  • 55