8
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        var zip = a.Zip(b, (x, y) => x + "-" + y);
        zip.Subscribe(Console.WriteLine);

Prints
0 - 5
1 - 6
2 - 7
...

Instead, I would like to join identical values
5 - 5
6 - 6
7 - 7
8 - 8
...

This is a simplified example of the problem merging 100s of ordered asynchronous sequences. It is very easy to join two IEnumerable's, but I could not find a way to do something like this in Rx. Any ideas?

More about inputs and what I am trying to achieve. Basically, the whole system is a real-time pipeline with multiple state machines (aggregators, buffers, smoothing filters, etc) connected by fork-join pattern. Is RX a good fit for implementing such things? Each input can be represented as

public struct DataPoint
{
    public double Value;
    public DateTimeOffset Timestamp;
}

Each input bit of data is timestamped on arrival, thus all events are naturally ordered by their joining key (timestamp). As the events travel through the pipeline they get forked and joined. Joins need to be correlated by timestamp and applied in predefined order. For example, join(a,b,c,d) => join(join(join(a,b),c),d).

Edit Below is what I could come up with in a hurry. Hopefully there is a simpler solution based on the existing Rx operators.

static void Test()
    {
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        //var zip = a.Zip(b, (x, y) => x + "-" + y);
        //zip.Subscribe(Console.WriteLine);

        var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
        joined.Subscribe(Console.WriteLine);
    }

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
    {
        return Observable.CreateWithDisposable<string>(o =>
            {
                Queue<int> a = new Queue<int>();
                Queue<int> b = new Queue<int>();
                object gate = new object();

                left.Subscribe(x =>
                    {
                        lock (gate)
                        {
                            if (a.Count == 0 || a.Peek() < x)
                                a.Enqueue(x);

                            while (a.Count != 0 && b.Count != 0)
                            {
                                if (a.Peek() == b.Peek())
                                {
                                    o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                                }
                                else if (a.Peek() < b.Peek())
                                {
                                    a.Dequeue();
                                }
                                else
                                {
                                    b.Dequeue();
                                }
                            }
                        }
                    });

                right.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (b.Count == 0 || b.Peek() < x)
                            b.Enqueue(x);

                        while (a.Count != 0 && b.Count != 0)
                        {
                            if (a.Peek() == b.Peek())
                            {
                                o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                            }
                            else if (a.Peek() < b.Peek())
                            {
                                a.Dequeue();
                            }
                            else
                            {
                                b.Dequeue();
                            }
                        }
                    }
                });

                return Disposable.Empty;
            });
  • Asked same question on the [rx forum](http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963-0c83-4968-a1b2-1317d5e31ae5) –  Feb 06 '11 at 17:16

4 Answers4

3

GroupBy may do what you need. It seems that you have no time constraints on when items get "joined", you just need similar items to be together in some fashion.

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15))
.GroupBy(k => k)
.Subscribe( go => go.Count().Where(cnt => cnt > 1)
                            .Subscribe(cnt => 
                     Console.WriteLine("Key {0} has {1} matches", go.Key, cnt)));

Two things to note about the above, Merge has the following overloads, so that the your req to have hundreds of joined streams won't present an issue:

Merge<TSource>(params IObservable<TSource>[] sources);
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources);
Merge<TSource>(this IObservable<IObservable<TSource>> source);

Furthermore, GroupBy returns IObservable<IGroupedObservable<TKey, TSource>> which means that you can react to each group, and each new member of each group as they come in - no need to wait till all complete.

Scott Weinstein
  • 18,890
  • 14
  • 78
  • 115
  • The only problem is that I need to be able to join values in order. However it can be solved if instead of int I pass index-value tuples. –  Feb 06 '11 at 16:34
  • What do you mean by "in order"? – Scott Weinstein Feb 07 '11 at 15:51
  • Keep in mind that by using `Merge` + `Count`, you won't get any matches until both source sequences finish. This is fine for the example `Range`, but if your sources are hot/unending the output may not be what you expect. – Richard Szalay Feb 07 '11 at 20:30
  • the order if which entities with equal keys are joined together: join(a,b,c) === join(join(a,b),c) –  Feb 07 '11 at 23:55
2

This answer is copied from the Rx forums, just so that it will be archived in here as well:

var xs = Observable.Range(1, 10);
var ys = Observable.Range(5, 10);

var joined = from x in xs
    from y in ys
    where x == y
    select x + "-" + y;

Or without using query expressions:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y})
    .Where(t => t.x == t.y)
    .Select(t => t.x + "-" + t.y);
Omer Mor
  • 5,216
  • 2
  • 34
  • 39
  • 2
    The only problem with this solution is that it requires that `ys` is hot (or `Multicast`) and doesn't support the scenario where the `ys` value turns up before the `xs` value. – Richard Szalay Feb 07 '11 at 20:27
2

I honestly can't think of a solution based on existing operators that works for hot sources of unknown order (that is, xs before ys vs ys before xs). Your solution seems fine (hey, if it works), but I'd make a few changes if it were my code:

  • Support cancellation properly using MutableDisposable and CompositeDisposable
  • Call OnError for exceptions thrown from the selector (making it more consistant with other operators)
  • Consider supporting completion if it's possible for one source to complete before the other

The code below has been tested with your dual-Range input, the same inputs flipped, as well as with Empty<int> + Never<int>:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
    return Observable.CreateWithDisposable<string>(o =>
    {
        Queue<int> a = new Queue<int>();
        Queue<int> b = new Queue<int>();
        object gate = new object();

        bool leftComplete = false;
        bool rightComplete = false;

        MutableDisposable leftSubscription = new MutableDisposable();
        MutableDisposable rightSubscription = new MutableDisposable();

        Action tryDequeue = () =>
        {
            lock (gate)
            {
                while (a.Count != 0 && b.Count != 0)
                {
                    if (a.Peek() == b.Peek())
                    {
                        string value = null;

                        try
                        {
                            value = selector(a.Dequeue(), b.Dequeue());
                        }
                        catch (Exception ex)
                        {
                            o.OnError(ex);
                            return;
                        }

                        o.OnNext(value);
                    }
                    else if (a.Peek() < b.Peek())
                    {
                        a.Dequeue();
                    }
                    else
                    {
                        b.Dequeue();
                    }
                }
            }
        };

        leftSubscription.Disposable = left.Subscribe(x =>
        {
            lock (gate)
            {
                if (a.Count == 0 || a.Peek() < x)
                    a.Enqueue(x);

                tryDequeue();

                if (rightComplete && b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        }, () =>
        {
            leftComplete = true;

            if (a.Count == 0 || rightComplete)
            {
                o.OnCompleted();
            }
        });

        rightSubscription.Disposable = right.Subscribe(x =>
        {
            lock (gate)
            {
                if (b.Count == 0 || b.Peek() < x)
                    b.Enqueue(x);

                tryDequeue();

                if (rightComplete && b.Count == 0)
                {
                    o.OnCompleted();
                }
            }
        }, () =>
        {
            rightComplete = true;

            if (b.Count == 0 || leftComplete)
            {
                o.OnCompleted();
            }
        });

        return new CompositeDisposable(leftSubscription, rightSubscription);
    });
}
Richard Szalay
  • 83,269
  • 19
  • 178
  • 237
1

How about using the new Join operator in v.2838.

var a = Observable.Range(1, 10);
var b = Observable.Range(5, 10);

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput,  bOutput))
    .Where(tupple => tupple.Item1 == tupple.Item2);

joinedStream.Subscribe(output => Trace.WriteLine(output));

This is my first look at Join and I'm not sure if it'd be wise to use the Never operator like this. When dealing with a large volumes of inputs as it'd gernerate a huge amount opertaions the more inputs were revieved. I would think that work could be done to close the windows as matche are made and make the solution more efficient. That said the example above works as per your question.

For the record I think Scott's answer is probably the way to go in this instance. I'm just throwing this in as a potential alternative.

James Hay
  • 12,580
  • 8
  • 44
  • 67
  • +1 for solution with join. I spent an hour yesterday and could not get it to work. I share your concerns about performance. Also, the resulting code is much more cryptic and difficult to follow comparing to simple LINQ join. I am starting to think that Rx is just not a good solution for this type of problems. –  Feb 06 '11 at 18:46
  • @Serger - I'm sure this could be made more efficient by emiting duration values as matches are made (i.e. replacing the Observable.Never with something a little more intelligent). It'd all depend on what you're rules are for when it's safe to complete the duration. – James Hay Feb 06 '11 at 18:55