var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
Prints
0 - 5
1 - 6
2 - 7
...
Instead, I would like to join identical values
5 - 5
6 - 6
7 - 7
8 - 8
...
This is a simplified example of the problem merging 100s of ordered asynchronous sequences. It is very easy to join two IEnumerable's, but I could not find a way to do something like this in Rx. Any ideas?
More about inputs and what I am trying to achieve. Basically, the whole system is a real-time pipeline with multiple state machines (aggregators, buffers, smoothing filters, etc) connected by fork-join pattern. Is RX a good fit for implementing such things? Each input can be represented as
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
Each input bit of data is timestamped on arrival, thus all events are naturally ordered by their joining key (timestamp). As the events travel through the pipeline they get forked and joined. Joins need to be correlated by timestamp and applied in predefined order. For example, join(a,b,c,d) => join(join(join(a,b),c),d).
Edit Below is what I could come up with in a hurry. Hopefully there is a simpler solution based on the existing Rx operators.
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});