16

In an IObservable sequence (in Reactive Extensions for .NET), I'd like to get the value of the previous and current elements so that I can compare them. I found an example online similar to below which accomplishes the task:

sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })

It works fine except that it evaluates the sequence twice, which I would like to avoid. You can see that it is being evaluated twice with this code:

var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence"));
debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();

The output shows twice as many of the debug lines as there are elements in the sequence.

I understand why this happens, but so far I haven't found an alternative that doesn't evaluate the sequence twice. How can I combine the previous and current with only one sequence evaluation?

dcstraw
  • 3,243
  • 3
  • 29
  • 38

5 Answers5

32

There's a better solution to this I think, that uses Observable.Scan and avoids the double subscription:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

I've written this up on my blog here: http://www.zerobugbuild.com/?p=213

Addendum

A further modification allows you to work with arbitrary types more cleanly by using a result selector:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}
Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • 1
    Link to blog provides a use case for the first not the second solution. Here's one: `myObservable.CombineWithPrevious((older, newer) => (older > 0) && (newer == 0))` will detect when some value just got down to zero. Could be filtered further with `.Where(v => v == true);` – Stéphane Gourichon Jul 04 '15 at 19:03
5

@James World addendum looks great to me, if not for Tuple<>, which I almost always dislike: "Was .Item1 the previous? Or was it the current one? I can't remember. And what's the first argument to the selector, was it the previous item?".

For that part I liked @dcstraw definition of a dedicated ItemWithPrevious<T>. So there you go, putting the two together (hopefully I did not mix up previous with current) with some renaming and facilities:

public static class ObservableExtensions
{
    public static IObservable<SortedPair<TSource>> CombineWithPrevious<TSource>(
        this IObservable<TSource> source, 
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source.Scan(seed,
            (acc, current) => SortedPair.Create(current, acc.Current));
    }

    public static IObservable<TResult> CombineWithPrevious<TSource, TResult>(
        this IObservable<TSource> source,
        Func<SortedPair<TSource>, TResult> resultSelector,
        TSource initialValue = default(TSource))
    {
        var seed = SortedPair.Create(initialValue, initialValue);

        return source
            .Scan(seed,
                (acc, current) => SortedPair.Create(current, acc.Current))
            .Select(p => resultSelector(p));
    }
}

public class SortedPair<T>
{
    public SortedPair(T current, T previous)
    {
        Current = current;
        Previous = previous;
    }

    public SortedPair(T current) : this(current, default(T)) { }

    public SortedPair() : this(default(T), default(T)) { }

    public T Current;
    public T Previous;
}

public class SortedPair
{
    public static SortedPair<T> Create<T>(T current, T previous)
    {
        return new SortedPair<T>(current, previous);
    }

    public static SortedPair<T> Create<T>(T current)
    {
        return new SortedPair<T>(current);
    }

    public static SortedPair<T> Create<T>()
    {
        return new SortedPair<T>();
    }
}
superjos
  • 12,189
  • 6
  • 89
  • 134
3

Evaluating twice is an indicator of a Cold observable. You can turn it to a Hot one by using .Publish():

var pub = sequence.Publish();
pub.Zip(pub.Skip(1), (...
pub.Connect();
Sergey Aldoukhov
  • 22,316
  • 18
  • 72
  • 99
  • Found an easy to digest article about the difference between Hot and Cold Observables: http://blogs.microsoft.co.il/blogs/bnaya/archive/2010/03/13/rx-for-beginners-part-9-hot-vs-cold-observable.aspx – gn22 May 13 '10 at 18:10
  • Using Publish and Connect I still see the side effects of the Zip(Skip(1), ...) occur twice. I suppose it's not really the original source I needed to evaluate once, but the IObservable query itself. Still, thanks for the info. – dcstraw May 17 '10 at 16:15
  • @dcstraw Strange, I published the debugSequence from your example above (I used Observable.Range for the sequence) and it evaluated once for me. Using .Publish in the described situation is pretty much a no-brainer in the RX land... – Sergey Aldoukhov May 18 '10 at 02:49
0

If you only need to access the previous element during subscription, this is probably the simplest thing that will work. (I'm sure there's a better way, maybe a buffer operator on IObservable? The documentation is pretty sparse at the moment, so I can't really tell you.)

    EventArgs prev = null;

    sequence.Subscribe(curr => 
    {
        if (prev != null)
        {
            // Previous and current element available here
        }

        prev = curr;                              

    });

EventArgs is just a stand-in for the type of your event's argument.

gn22
  • 2,076
  • 12
  • 15
  • Thanks for the suggestion. I do however need the previous element before subscription because I would like to filter the sequence based on logic using the previous and current. – dcstraw May 12 '10 at 17:17
  • Got it; i'll do a bit more research – gn22 May 12 '10 at 17:29
-1

It turns out you can use a variable to hold the previous value and refer to it and reassign it within the chain of IObservable extensions. This even works within a helper method. With the code below I can now call CombineWithPrevious() on my IObservable to get a reference to the previous value, without re-evaluating the sequence.

public class ItemWithPrevious<T>
{
    public T Previous;
    public T Current;
}

public static class MyExtensions
{
    public static IObservable<ItemWithPrevious<T>> CombineWithPrevious<T>(this IObservable<T> source)
    {
        var previous = default(T);

        return source
            .Select(t => new ItemWithPrevious<T> { Previous = previous, Current = t })
            .Do(items => previous = items.Current);
    }
}
dcstraw
  • 3,243
  • 3
  • 29
  • 38
  • This has (an unfortunately common) bug! Avoid using "Do" because it introduces a side-effect; it's executed for every subscriber. This code breaks for every subscription to a given CombineWithPrevious invocation after the first because the second and subsequent subscriptions will cause Do to overwrite the previous value with the current one. So this works: `source.CombineWithPrevious().Subscribe(first); source.CombineWithPrevious().Subscribe(second);` but this doesn't: `var x = source.CombineWithPrevious(); x.Subscribe(first); x.Subscribe(second);` See my answer below for a correct approach. – James World Oct 02 '13 at 09:20