0

I have following case with result i want to achieve:

L --1---3---4---5---6---7---8-
R ---A--------B--CDE-FG--H---I

O ---A--------B--C--E---G---H-
  ---1--------4--5--6---7---H-

So basically I have two sources with relative speed changing over time. The result should ignore Sometimes L elements and sometimes R elements. How to build that kind of RX?

I was trying with Joins, Sample, WithLatestFrom, Distincts etc. But I can always ignore some elements from L source but not R. I am trying to achieve it in C# but any language answer will be helpful.

Joe
  • 2,551
  • 6
  • 38
  • 60

3 Answers3

2

It seems like you're looking for a ZipLatest implementation. See Does my "zipLatest" operator already exist?

EDIT:

If you want an implementation, without downloading a library, I think this would work:

public static IObservable<Tuple<T1, T2>> ZipLatest<T1, T2>(this IObservable<T1> left, IObservable<T2> right)
{
    return Observable.Defer(() =>
    {
        var leftSubject = new BehaviorSubject<bool>(false);
        var rightSubject = new BehaviorSubject<bool>(false);

        return left.Publish(_left =>
            right.Publish(_right =>
            {
                return Observable.CombineLatest(
                    _left,
                    _right,
                    _left.Select(_ => true).Merge(leftSubject),
                    _right.Select(_ => true).Merge(rightSubject), 
                    (l, r, l_bool, r_bool) => Tuple.Create(l_bool, r_bool, l, r)
                )
                    .Where(t => t.Item1 && t.Item2)
                    .Select(t => Tuple.Create(t.Item3, t.Item4))
                    .Do(_ =>
                    {
                        leftSubject.OnNext(false);
                        rightSubject.OnNext(false);
                    });
            })
        );
    });

Running it against @Enigmativity's test code (modified)...

void Main()
{
    var L = new Subject<int>();
    var R = new Subject<char>();

    var O = L.ZipLatest(R)
        .Select(t => new { l = t.Item1, r = t.Item2});

    O.Subscribe(o => Console.WriteLine($"{o.l}{o.r}"));

    L.OnNext(1);
    R.OnNext('A');
    L.OnNext(3);
    L.OnNext(4);
    R.OnNext('B');
    L.OnNext(5);
    R.OnNext('C');
    R.OnNext('D');
    R.OnNext('E');
    L.OnNext(6);
    R.OnNext('F');
    R.OnNext('G');
    L.OnNext(7);
    R.OnNext('H');
    L.OnNext(8);
    R.OnNext('I');
}

...you get the correct results:

1A
4B
5C
6E
7G
8H
Community
  • 1
  • 1
Shlomo
  • 14,102
  • 3
  • 28
  • 43
1

This is my attempt for now; I will try this in a compiler later!

Keep track of the last value sent through a combine latest and ensure that each value is unique before applying the operation. Define this operation as an extension method.

public static IObservable<C> ZipLatest<A, B, C>(
    this IObservable<A> sourceA,
    IObservable<B> sourceB,
    Func<A, B, C> op) 
{
    IObservable<Tuple<A, B>> combined = sourceA.CombineLatest(
            sourceB, (a, b) => Tuple.Create(a, b));
    Tuple<A, B> last = null;
    return combined.Where(curr => 
    {
        if (last == null || (last.Item1 != curr.Item1 && last.Item2 != curr.Item2))
        {
            last = curr;
            return true;
        }
        return false;
    }).Select(curr => op(curr.Item1, curr.Item2));
}
flakes
  • 21,558
  • 8
  • 41
  • 88
0

There doesn't seem to be a rule in what you're asking for. You need to be able to consistently describe what should happen with values from L and R.

This is the closest that I could get:

var L = new Subject<int>();
var R = new Subject<char>();

var O = L.Select(l => R.Select(r => new { r, l })).Switch();

O.Subscribe(o => Console.WriteLine($"{o.l}{o.r}"));

L.OnNext(1);
R.OnNext('A');
L.OnNext(3);
L.OnNext(4);
R.OnNext('B');
L.OnNext(5);
R.OnNext('C');
R.OnNext('D');
R.OnNext('E');
L.OnNext(6);
R.OnNext('F');
R.OnNext('G');
L.OnNext(7);
R.OnNext('H');
L.OnNext(8);
R.OnNext('I');

That gave me:

1A
4B
5C
5D
5E
6F
6G
7H
8I

Also, from your marble diagram it's not clear if 'F' or 6 comes first. You need to clarify the value order and the rules to combine the streams.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Thanks for answer, I corrected the diagram. Similar results to yours I achieved using Join or CombineLatest. The key problem here is both sources have to follow same rule: "I can notify only once per other source interval between elements". Plus no element from both sources can appear in result twice. – Joe Oct 21 '16 at 13:32