7

I need to implement a version of CombineLatest (I'll call it WithLatest here) that calls the selector for every item on the left and the latest item on the right. It shouldn't push for items on the right changing only.

I think whether this is built Observable.Create or a combination of existing extensions is not particularly important; I'll be making this a "boxed" extension method either way.

Example

var left = new Subject<int>();
var right = new Subject<int>();

left.WithLatest(right, (l,r) => l + " " + r).Dump();

left.OnNext(1);   // <1>
left.OnNext(2);   // <2>
right.OnNext(1);  // <3>
right.OnNext(2);  // <4>
left.OnNext(3);   // <5>

should yield

2 1
3 2

Edit: The logic of my example goes:

  1. Left becomes populated with 1. Right is empty, no values pushed.
  2. Left becomes updated with 2 (it forgets the previous value). Right is still empty, so nothing is pushed.
  3. Right becomes populated with 1, so Left = 2 (the latest value), Right = 1 is pushed. Up to this point, there is no difference between WithLatest and CombineLatest
  4. Right is updated -- nothing is pushed. This is what's different
  5. Left is updated with 3, so Left = 3, Right = 2 (the latest value) is pushed.

It's been suggested that I try:

var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();

but this blocks on the current thread for my test.

Flexo
  • 87,323
  • 22
  • 191
  • 272
Daniel Moore
  • 1,116
  • 1
  • 9
  • 16
  • You want to do this w/ existing combinators only, or is an implementation using `Create()` an option? – Scott Weinstein Jul 07 '11 at 22:05
  • 1
    Your example doesn't match what I'd expect from your description. The first item returned in your example seems to be triggered by a change on the right observable. Should `WithLatest` trigger on the first right if there are already items from left? – Gideon Engelberth Jul 08 '11 at 04:35
  • 1
    @Gideon Engelberth: Agree. That left me puzzled too. According to the description, the algorithm should yield "3 2" only. – 3dGrabber Aug 12 '13 at 09:01
  • Can you writs your answer up as an answer rather than an edit to the question? – Flexo Jul 28 '15 at 06:16
  • See "CombineVeryLatest" in [my answer here](http://stackoverflow.com/questions/3211134/how-to-throttle-event-stream-using-rx/3224723#3224723) - very similar pattern. – Sergey Aldoukhov Jul 12 '11 at 20:59

6 Answers6

5

You can do this using existing operators.

Func<int, int, string> selector = (l, r) => l + " " + r;

var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
  • Publish ensures we only ever subscribe to right once and share the subscription among all subscribers to rs.

  • MostRecent turns an IObservable<T> into an IEnumerable<T> that always yields the most recently emitted value from the source observable.

  • Zip between IObservable<T> and IEnumerable<U> emits a value each time the observable emits a value.

  • SkipUntil skips the pairs (l, r) which occur before right ever emits a value.

Timothy Shields
  • 75,459
  • 18
  • 120
  • 173
4

I also had the same need for a CombineLatest which "pushes only for the left".

I made the solution an "overload" of Observable.Sample, because that's what the method does:
It samples a source (right) with a sampler (left), with the additional capability of providing a resultSelector (like in CombineLatest).

public static IObservable<TResult> Sample<TSource, TSample, TResult>(
    this IObservable<TSource> source,
    IObservable<TSample> sampler,
    Func<TSource, TSample, TResult> resultSelector)
{
    var multiSampler = sampler.Publish().RefCount();
    return source.CombineLatest(multiSampler, resultSelector).Sample(multiSampler);
}
3dGrabber
  • 4,710
  • 1
  • 34
  • 42
  • Usage for the example above: `right.Sample(left, (r,l) => l + " " + r).Dump();` It produces "3 2" which is IMO correct – 3dGrabber Aug 12 '13 at 09:31
1

Based on the solution picked by the post author I think there's an even simpler solution utilizing DistinctUntilChanged:

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .Select<TLeft, Tuple<TLeft, int>>(Tuple.Create<TLeft, int>)
            .CombineLatest(rightSource,
                (l, r) => new { Index = l.Item2, Left = l.Item1, Right = r })
            .DistinctUntilChanged(x => x.Index)
            .Select(x => selector(x.Left, x.Right));
    }

or even

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .CombineLatest(rightSource,
                (l, r) => new { Left = l, Right = r })
            .DistinctUntilChanged(x => x.Left)
            .Select(x => selector(x.Left, x.Right));
    }

if you only care about distinct values of leftSource

fachammer
  • 181
  • 4
1

On latest System.Reactive, we can use WithLatestFrom extension method.

left.WithLatestFrom(right, (l, r) => l + " " + r).Dump();

The result would be below correctly.

3 2 
idubnori
  • 997
  • 9
  • 21
0

I made a RX operator for project today that does this.

Here's my solutions:

    public static IObservable<Tuple<TSource, TTarget>> JoinLeftSoft<TSource, TTarget>(
        this IObservable<TSource> source, IObservable<TTarget> right)
    {
        return source
            .Select(x => new Tuple<object, TSource>(new object(), x))
            .CombineLatest(right, (l, r) => new Tuple<object, TSource, TTarget>(l.Item1, l.Item2, r))
            .DistinctUntilChanged(t => t.Item1)
            .Select(t => new Tuple<TSource, TTarget>(t.Item2, t.Item3));
    }
goodfriend0
  • 193
  • 1
  • 8
0

Here's the hacky way using Create - didn't actually build it, mea culpa if it doesn't actually work :)

public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
        this IObservable<TLeft> lhs, 
        IObservable<TRight> rhs, 
        Func<TLeft, TRight, TRet> sel)
{
    return Observable.Create<TRet>(subj => {
        bool rhsSet = false;
        bool deaded = false;
        var latestRhs = default(TRight);

        Action onDeaded = null;

        var rhsDisp = rhs.Subscribe(
            x => { latestRhs = x; rhsSet = true; }, 
            ex => { subj.OnError(ex); onDeaded(); });

        var lhsDisp = lhs
            .Where(_ => deaded == false && rhsSet == true)
            .Subscribe(
                x => subj.OnNext(sel(x, latestRhs)),
                ex => { subj.OnError(ex); onDeaded(); },
                () => { subj.OnCompleted(); onDeaded(); });

        onDeaded = () => {
            deaded = true;
            if (lhsDisp != null) {
                lhsDisp.Dispose();
                lhsDisp = null;
            }
            if (rhsDisp != null) {
                rhsDisp.Dispose();
                rhsDisp = null;
            }
        };

        return onDeaded;
    });
}
Ana Betts
  • 73,868
  • 16
  • 141
  • 209