0

( Using the Zip operator in Reactive Extensions (Rx) )

Combining Stream Pairs into One without Timeout

        var xyZipped = xStream.Zip(yStream, (x, y) =>
        {
            Debug.WriteLine("Latest Pair Has Arrived");
            return new List<SomeType> { x, y };
        });
  • But how could you introduce a maximum allowed time interval between the two values in each stream, so that if the inter-value interval is exceeded then no value would be output from xyZipped

  • And if too long passes between the two values then pairing should be reset as well i.e. for another pairing to occur after a timeout a new value should be produced in each of the streams (not just one).

  • Or would it be better to use a different operator / implementation to achieve this kind of stream logic?

Community
  • 1
  • 1
Cel
  • 6,467
  • 8
  • 75
  • 110
  • possible duplicate of [Does reactive extensions support rolling buffers?](http://stackoverflow.com/questions/7597773/does-reactive-extensions-support-rolling-buffers) – Chris Shain Jan 25 '12 at 16:09

1 Answers1

1

You can just use Rx combinators. Since, your primary aim is Zip, let's start with Zip, and then apply your expiry conditions.

public static IObservable<TOut> ZipWithExpiry<TLeft, TRight, TOut>(
                    IObservable<TLeft> left, 
                    IObservable<TRight> right, 
                    Func<TLeft, TRight, TOut> selector, 
                    TimeSpan validity)
        {
            return Observable.Zip(left.Timestamp(), right.Timestamp(), (l, r) => Tuple.Create(l, r))
                             .Where(tuple => Math.Abs((tuple.Item1.Timestamp - tuple.Item2.Timestamp).TotalSeconds) < validity.TotalSeconds)
                             .Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value));
        }

If you want to check the adjacent values in a stream, you can rewrite it using TimeInterval operator instead of Timestamp.

Asti
  • 12,447
  • 29
  • 38
  • 1
    Thanks for providing a solution! Turned out i needed `CombineLatest` instead of `Zip` for my purposes, so below is a slightly modified extension method that tested good for me: `public static IObservable CombineWithExpiry(this IObservable left, IObservable right, Func selector, TimeSpan validity)` – Cel Jan 26 '12 at 12:13
  • `{ return left.Timestamp().CombineLatest(right.Timestamp(), Tuple.Create) .Where(tuple => (tuple.Item1.Timestamp - tuple.Item2.Timestamp).Duration().TotalMilliseconds < validity.TotalMilliseconds) .Select(tuple => selector(tuple.Item1.Value, tuple.Item2.Value)); }` – Cel Jan 26 '12 at 12:14
  • @Cel Great! I didn't know what exactly you meant by interval between values, so I took it for the interval between two successive values. Still, it's good you found your solution. – Asti Jan 26 '12 at 18:39