4

I have a stream with live data, and a stream which basically delimits parts of the live data that belong together. Now when someone subscribes to the live data stream, I would like to replay them the live data. However I don't want to remember all the live data, only the part since the last time the other stream emitted a value.

There is an issue which would solve my problem, since there is a replay operator which does exactly what I want (or at least I think).

What is currently the way to do this easily? Is there a better way than something like the following?

private class ReplayWithLimitObservable<TItem, TDelimiter> : IConnectableObservable<TItem>
{
    private readonly List<TItem> cached = new List<TItem>();
    private readonly IObservable<TDelimiter> delimitersObservable;
    private readonly IObservable<TItem> itemsObservable;
    public ReplayWithLimitObservable(IObservable<TItem> itemsObservable, IObservable<TDelimiter> delimitersObservable)
    {
        this.itemsObservable = itemsObservable;
        this.delimitersObservable = delimitersObservable;
    }

    public IDisposable Subscribe(IObserver<TItem> observer)
    {
        lock (cached)
        {
            cached.ForEach(observer.OnNext);
        }

        return itemsObservable.Subscribe(observer);
    }

    public IDisposable Connect()
    {
        var delimiters = delimitersObservable.Subscribe(
            p =>
                {
                    lock (cached)
                    {
                        cached.Clear();
                    }
                });
        var items = itemsObservable.Subscribe(
            p =>
                {
                    lock (cached)
                    {
                        cached.Add(p);
                    }
                });
        return Disposable.Create(
            () =>
                {
                    items.Dispose();
                    delimiters.Dispose();
                    lock (cached)
                    {
                        cached.Clear();
                    }
            });
}

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}
Martin Kolinek
  • 2,010
  • 14
  • 16
  • Just a thought... wouldn't `ConcurrentBag` be a better option than using `lock` on `cached`? I mean, that's what it's designed for... – toadflakz Dec 05 '14 at 09:25
  • 2
    @toadflakz - AFAIK, ConcurrentBag does not guarantee preserving order of addition (if the items observable behaves correctly, I get the items in correct order using a List). ConcurrentQueue could solve that, but clearing a list is easier than clearing a ConcurrentQueue. – Martin Kolinek Dec 05 '14 at 09:29
  • Thanks for the explanation - I'm interested in realtime data development so code design decision insights from people with experience is appreciated. – toadflakz Dec 05 '14 at 09:34
  • I agree that your question represents a core limitation in the Framework and my work item that you mentioned which proposes to add "reactive/reactive" overloads to `Replay` could solve your problem. – Dave Sexton Dec 05 '14 at 17:00
  • Until then, your solution is fine except for a couple of problems: 1) Never implement `IObservable` yourself. In this case, you need an `IConnectableObservable` and it's quicker than implementing a custom subject; however, you should still derive from `ObservableBase` to ensure auto-detach behavior, trampoline exists, etc. 2) Your `Subscribe` implementation has a race condition that could allow you to miss notifications. Make sure that you call `itemsObservable.Subscribe` within the lock. – Dave Sexton Dec 05 '14 at 17:02
  • Brandon's answer made me realize that my first point doesn't actually apply here because technically you're not implementing `IObservable` yourself, you're still relying on an observable passed in as a parameter, thus you don't have to derive from `ObservableBase`. My second point about the race condition bug still applies though. – Dave Sexton Dec 06 '14 at 00:43

1 Answers1

5

Does this do what you want? It has the advantage of leaving all of the locking and race conditions to the Rx pros :)

private class ReplayWithLimitObservable<T, TDelimiter> : IConnectableObservable<T>
{
  private IConnectableObservable<IObservable<T>> _source;

  public ReplayWithLimitObservable(IObservable<T> source, IObservable<TDelimiter> delimiter)
  {
    _source = source
      .Window(delimiter) // new replay window on delimiter
      .Select<IObservable<T>,IObservable<T>>(window =>
      {
        var replayWindow = window.Replay();

        // immediately connect and start memorizing values
        replayWindow.Connect();

        return replayWindow;
      })
      .Replay(1); // remember the latest window
  }

  IDisposable Connect()
  {
    return _source.Connect();
  }

  IDisposable Subscribe(IObserver<T> observer)
  {
    return _source
      .Concat()
      .Subscribe(observer);
  }
}

public static IConnectableObservable<TItem> ReplayWithLimit<TItem, TDelimiter>(IObservable<TItem> items, IObservable<TDelimiter> delimiters)
{
    return new ReplayWithLimitObservable<TItem, TDelimiter>(items, delimiters);
}
Brandon
  • 38,310
  • 8
  • 82
  • 87
  • This is better, but I still don't like the fact that `IConnectableObservable` has to be implemented. That should never be true, don't you think? You're so close to avoiding it here, yet still so far... ;-) – Dave Sexton Dec 06 '14 at 00:44
  • At the very least, [this](https://github.com/Reactive-Extensions/Rx.NET/issues/54), though a "reactive/reactive" version of `ReplaySubject` would be best. It would solve this problem in a line of code: `return xs.Multicast(new ReplaySubject(ys));` – Dave Sexton Dec 06 '14 at 00:47
  • @DaveSexton - Totally agree, my reaction to this question was that a factory for `IConnectableObservable` would indeed be a welcome addition to Rx. – James World Dec 06 '14 at 09:21
  • Nice answer Brandon, particularly the mechanic with the `Replay(1)` and deferred `Concat` - very nice. – James World Dec 06 '14 at 09:31
  • Yeah I tried to do it with window and replay, but the `Replay(1)` didn't occur to me. Thanks, this is what I was looking for. – Martin Kolinek Dec 06 '14 at 11:17