It seems like you're looking for a ZipLatest
implementation. See Does my "zipLatest" operator already exist?
EDIT:
If you want an implementation, without downloading a library, I think this would work:
public static IObservable<Tuple<T1, T2>> ZipLatest<T1, T2>(this IObservable<T1> left, IObservable<T2> right)
{
return Observable.Defer(() =>
{
var leftSubject = new BehaviorSubject<bool>(false);
var rightSubject = new BehaviorSubject<bool>(false);
return left.Publish(_left =>
right.Publish(_right =>
{
return Observable.CombineLatest(
_left,
_right,
_left.Select(_ => true).Merge(leftSubject),
_right.Select(_ => true).Merge(rightSubject),
(l, r, l_bool, r_bool) => Tuple.Create(l_bool, r_bool, l, r)
)
.Where(t => t.Item1 && t.Item2)
.Select(t => Tuple.Create(t.Item3, t.Item4))
.Do(_ =>
{
leftSubject.OnNext(false);
rightSubject.OnNext(false);
});
})
);
});
Running it against @Enigmativity's test code (modified)...
void Main()
{
var L = new Subject<int>();
var R = new Subject<char>();
var O = L.ZipLatest(R)
.Select(t => new { l = t.Item1, r = t.Item2});
O.Subscribe(o => Console.WriteLine($"{o.l}{o.r}"));
L.OnNext(1);
R.OnNext('A');
L.OnNext(3);
L.OnNext(4);
R.OnNext('B');
L.OnNext(5);
R.OnNext('C');
R.OnNext('D');
R.OnNext('E');
L.OnNext(6);
R.OnNext('F');
R.OnNext('G');
L.OnNext(7);
R.OnNext('H');
L.OnNext(8);
R.OnNext('I');
}
...you get the correct results:
1A
4B
5C
6E
7G
8H