0

Here we have a Observable Sequence... in .NET using Rx.

var aSource = new Subject<int>();

var bSource = new Subject<int>();

var paired = Observable
            .Merge(aSource, bSource)
    .GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

paired.Subscribe(g => Console.WriteLine("{0}:{1}", g.ElementAt(0), g.ElementAt(1)));

aSource.OnNext(4);
bSource.OnNext(1);
aSource.OnNext(2);
bSource.OnNext(5);
aSource.OnNext(3);
bSource.OnNext(3);
aSource.OnNext(5);
bSource.OnNext(2);
aSource.OnNext(1);
bSource.OnNext(4);

Output: 3:3 5:5 2:2 1:1 4:4

We will get events every time a pair of numbers arrive with the same id.

Perfect! Just what i want.

Groups of two, paired by value.

Next question....

How to get a selectmany/buffer for sequences of values.

So 1,2,3,4,5 arrives at both aSource and bSource via OnNext(). Then fire ConsoleWriteLine() for 1-5. Then when 2,3,4,5,6 arrives, we get another console.writeline(). Any clues anyone?

Immediately, the Rx forum suggests looking at .Window()

http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

Which on the surface looks perfect. In my case i need a window of value 4, in this case.

Where in the query sequence does it belong to get this effect?

var paired = Observable.Merge(aSource, bSource).GroupBy(i => i).SelectMany(g => g.Buffer(2).Take(1));

Output 1,2,3,4,5 : 1,2,3,4,5 2,3,4,5,6 : 2,3,4,5,6

Regards,

Daniel

WebSight
  • 640
  • 2
  • 12
  • 27

2 Answers2

1

Assuming events arrive randomly at the sources, use my answer to "Reordering events with Reactive Extensions" to get the events in order.

Then use Observable.Buffer to create a sliding buffer:

// get this using the OrderedCollect/Sort in the referenced question
IObservable<int> orderedSource;

// then subscribe to this
orderedSource.Buffer(5, 1);
Community
  • 1
  • 1
James World
  • 29,019
  • 9
  • 86
  • 120
  • Thank you, i will try this when i get home tonight. I'm assuming Window(), as per the edit above, is for ordered sequences. – WebSight Nov 20 '13 at 20:41
  • Correct, `Window` gives you a stream of streams, `Buffer` a stream of lists. Notably, you only get buffers when they have closed, but window streams start sending immediately. – James World Nov 20 '13 at 20:43
  • Right, watch this space.... I'll post the results on here with any further issues. Thank you again James. – WebSight Nov 20 '13 at 20:46
0

Here is an extension method that fires when it has n inputs of the same ids.

public static class RxExtension
    {

        public static IObservable<TSource> MergeBuffer<TSource>(this IObservable<TSource> source, Func<TSource, int> keySelector, Func<IList<TSource>,TSource> mergeFunction, int bufferCount)
        {
            return Observable.Create<TSource>(o =>  {
                var buffer = new Dictionary<int, IList<TSource>>();
                return source.Subscribe<TSource>(i =>
                {
                    var index = keySelector(i);
                    if (buffer.ContainsKey(index))
                    {
                        buffer[index].Add(i);
                    }
                    else 
                    {
                        buffer.Add(index, new List<TSource>(){i});
                    }
                    if (buffer.Count==bufferCount)
                    {
                        o.OnNext(mergeFunction(buffer[index]));
                        buffer.Remove(index);
                    }
                });
            });
        }
    }

Calling the extension.

mainInput = Observable.Merge(inputNodes.ToArray()).MergeBuffer<NodeData>(x => x.id, x => MergeData(x), 1);
WebSight
  • 640
  • 2
  • 12
  • 27