11

I'm trying to reorder events arriving unordered on different threads.

Is it possible to create a reactive extension query that matches these marble diagrams:

s1          1   2       3   4

s2          1   3   2       4

result      1       2   3   4

and...

s1          1   2   3   4

s2          4   3   2   1

result                  1234

That is: Only publish results in version number order.

The closest I have got is using a Join to open a window each time s1 ticks and only close it when s2 arrives with the same number.

Like this:

var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
        publishedEvents.Scan(0, (i, o) => i + 1),
        expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
        _ => Observable.Never<Unit>(),
        (@event, expectedVersion) => new {@event,expectedVersion})
    .Where(x => x.expectedVersion == x.@event.Version)
    .Select(x => x.@event)
    .Subscribe(Persist);

But that won't work with diagram no 2. Group 2 will be completed once s2 ticks with the number 2, and thus before 1.

Does it make sense? Can it be done with Rx? Should it?

EDIT: I guess it's like overlapping windows, where later windows can't close before all preceding windows have closed. And the preceding windows won't close before the window number matches the event version number.

EDIT 2:

I have something like this now, but it's not really the reactive, functional, thread-safe LINQ-revelation, I hoped for (please ignore that my events are JObjects for now):

var orderedEvents = Observable.Create<JObject>(observer =>
{
    var nextVersionExpected = 1;
    var previousEvents = new List<JObject>();
    return events
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(@event =>
        {
            previousEvents.Add(@event);

            var version = (long) @event["Version"];
            if (version != nextVersionExpected) return;

            foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
            {
                if ((long) previousEvent["Version"] != nextVersionExpected)
                    break;

                observer.OnNext(previousEvent);
                previousEvents.Remove(previousEvent);
                nextVersionExpected++;
            }
        });
});
asgerhallas
  • 16,890
  • 6
  • 50
  • 68

1 Answers1

12

Introduction

The key to this problem is the sort. Anyway you look at it, some form of buffering is required. Whilst no doubt some elaborate combination of operators might achieve this, I think this is a good example where Observable.Create is a good choice.

Generalizing the solution

I've made some effort to generalize my approach to accept any type of ordering key. To do this, I expect to be given:

  • A key selector function used to obtain the key of an event, of type Func<TSource,TKey>
  • The initial key of type TKey
  • A function to get the next key in sequence, of type Func<TKey,TKey>
  • A result selector to generate the result from the paired up events in the source streams, of type Func<TSource,TSource,TSource>

Since I'm just using a 1-based integer sequence for my tests these are satisfied by:

  • keySelector: i => i
  • firstKey: 1
  • nextKeyFunc: k => k+1
  • resultSelector: (left,right) => left

Sort

Here is my Sort attempt. It buffers events into a Dictionary and flushes them as soon as possible to the subscriber:

public static IObservable<TSource> Sort<TSource, TKey>
    (this IObservable<TSource> source,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc)
{
    return Observable.Create<TSource>(o =>
    {
        var nextKey = firstKey;
        var buffer = new Dictionary<TKey, TSource>();
        return source.Subscribe(i =>
        {
            if (keySelector(i).Equals(nextKey))
            {
                nextKey = nextKeyFunc(nextKey);
                o.OnNext(i);
                TSource nextValue;
                while (buffer.TryGetValue(nextKey, out nextValue))
                {
                    buffer.Remove(nextKey);
                    o.OnNext(nextValue);
                    nextKey = nextKeyFunc(nextKey);
                }
            }
            else buffer.Add(keySelector(i), i);
        });
    });
}

I have to say this is a pretty naïve implementation. In production code in the past I have elaborated on this with specific error handling, a fixed-size buffer and time-outs to prevent resource leakage. However, it will do for this example. :)

With this sorted (sorry!), we can now look at handling multiple streams.

Combining Results

First Attempt

My first attempt at this is to produce an unordered stream of events that have been seen the required number of times. This could then be sorted. I do this by grouping elements by key, using GroupByUntil to hold each group until two elements had been captured. Each group is then a stream of results of the same key. For the simple example of integer events, I can just take the last element of each group. However, I don't like this because it's awkward for more real-world scenarios where each result stream may be contributing something useful. I include the code for the sake of interest. Note, so that the tests can be shared between this and my second attempt, I accept an unused resultSelector parameter:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc
     Func<TSource,TSource,TSource> resultSelector)
{
    return left.Merge(right)
               .GroupByUntil(keySelector, x => x.Take(2).LastAsync())
               .SelectMany(x => x.LastAsync())
               .Sort(keySelector, firstKey, nextKeyFunc);
}

Aside: You can hack on the SelectMany clause to decide how to pick results. One advantage this solution has over the second attempt, is that in scenarios with many result streams it is easier to see how to extend it to pick say, the first two out of three result tuples to arrive.

Second Attempt

For this approach I sort each stream independently, and then Zip the results together. Not only is this a far simpler looking operation, it's also far easier to combine results from each stream in interesting ways. To keep the tests compatible with my first approach, I pick the resultSelector function to use the first stream's events as the results, but obviously you have flexibility to do something useful in your scenario:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc,
     Func<TSource, TSource, TSource> resultSelector)
{
    return Observable.Zip(
        left.Sort(keySelector, firstKey, nextKeyFunc),
        right.Sort(keySelector, firstKey, nextKeyFunc),
        resultSelector);
}

Aside: It isn't too hard to see how this code be extended to a more general case accepting any number of input streams, but as alluded to earlier, using Zip makes it is quite inflexible about blocking at a given key until results from all streams are in.

Test Cases

Finally, here are my tests echoing your example scenarios. To run these, import nuget packages rx-testing and nunit and put the implementations above into a static class:

public class ReorderingEventsTests : ReactiveTest
{
    [Test]
    public void ReorderingTest1()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(400, 3),
            OnNext(500, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(500, 4));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left,right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(300, 2),
            OnNext(400, 3),
            OnNext(500, 4));
    }

    [Test]
    public void ReorderingTest2()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 4),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(400, 1));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left, right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(400, 1),
            OnNext(400, 2),
            OnNext(400, 3),
            OnNext(400, 4));
    }
}

Currying to avoid repetition

Final comment, because I hate repeating myself in code, here's a tweak that avoids the repetitious way I call Sort in the second approach. I've not included it in the main body to avoid confusing readers unfamiliar with currying:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
        IObservable<TSource> right,
        Func<TSource, TKey> keySelector,
        TKey firstKey,
        Func<TKey, TKey> nextKeyFunc,
        Func<TSource, TSource, TSource> resultSelector)
{
    Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
        events => events.Sort(keySelector, firstKey, nextKeyFunc);

    return Observable.Zip(
        curriedSort(left),
        curriedSort(right),
        resultSelector);
}
James World
  • 29,019
  • 9
  • 86
  • 120
  • Edited - Total rewrite for better clarity, and I've included a new and improved approach. – James World Nov 13 '13 at 07:38
  • You could also extend this to take TLeft and TRight instead of TSource, and have corresponding `Func` and `Func` key selectors... but I'm all done here now. – James World Nov 13 '13 at 08:12
  • 1
    This is a _really_ great answer! Thanks a lot for your efforts! I learned a good deal from this. – asgerhallas Nov 13 '13 at 11:35
  • As for the events coming in on different threads, would you mean a safe approach is to just use .ObserveOn() before OrderedCollect()? – asgerhallas Nov 13 '13 at 11:38
  • It shouldn't make any difference what thread events come in on, this code is thread-safe. (Rx does the heavy lifting.) If you need to observe the results on a particular thread, just before `Subscribe` is the best place for `ObserveOn`. – James World Nov 13 '13 at 11:41
  • That was what I hoped, but when I run this `events.Sort(x => (int) x["Version"], 1, l => l + 1).Subscribe(Persist);` with events coming in from a test with `Parallel.For(0, 100, i => subject.OnNext(CreateEventWithVersion(i+1).ToJObject()));` I get errors. I guess with concurrent access to the buffer. – asgerhallas Nov 13 '13 at 11:54
  • If I lock access to the buffer, it works ok. But I was under the impression that this was handled by Rx. Is it my use of subject? – asgerhallas Nov 13 '13 at 11:55
  • OK, that looks like `Subject` abuse. :) Your not allowed to make concurrent calls to `OnNext`. Try chaining a `.Synchronize()` after .events. It's a helper method for fixing naughty observables! – James World Nov 13 '13 at 11:58
  • let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/41101/discussion-between-james-world-and-asgerhallas) – James World Nov 13 '13 at 11:59