4

I'm creating a game in which there's an observable stream of events X representing products delivered by a manufacture. There are also some kind of external events (let's call them Transformers) that affect the performance of the manufacture in various ways and for various time periods. I want to represent that by other observables that emits a function that transforms X and which should be applied to each X until OnComplete of the Transformer. The numer of Transformers is not known upfront - they are created as a result of user actions (like equipment purchase) or generated randomly (like equipment failures).

I guess I need an IObservable<IObservable<Func<X,X>>> that I have to Join (Zip, something else?) with IObservable<X> to do this. Can you help me with that? Observable.CombineLatest is almost what I need but it takes an IEnumerable<IObservable<T>>.

If my description is unclear, here's a marble diagram: marble diagram

In more abstract terms, what I need is quite analogous to a transposition of a matrix but instead of List<List<T>> I have IObservable<IObservable<T>>.

Pein
  • 1,216
  • 9
  • 16
  • Which language are you using? – J Atkin Apr 26 '15 at 18:36
  • C# currently but I porting this to Java is very likely in near future. – Pein Apr 26 '15 at 18:38
  • Looks like from your marble diagram that the transforms do not affect the time of the events. Is that accurate? – James World Apr 27 '15 at 09:01
  • That's right - those are just mapping functions like in ordinary Select(). – Pein Apr 27 '15 at 15:43
  • Is the order of application important? In other words, are your transform functions commutative with each other? i.e. Given two transforms f and g and an event x, does f(g(x)) = g(f(x))? If it is important, how will you want order the tranforms? Specify it? Last in, last applied, last in first applied etc. – James World Apr 27 '15 at 23:22
  • At this stage of the project - whatever order I choose it will be OK. Let's assume last in - last applied. – Pein Apr 27 '15 at 23:36

4 Answers4

3

Assuming your transformers work on int and your observables are named like this:

IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;

I would first convert the Observable of Observable of transformers into an Observable of Array of Transformers, that is

IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>

First of all, we ultimately will want to add and remove functions into and from lists, and to make sure the right transformer is removed, we have to override the usual comparison mechanism on Func<...>. So we...

var transformerArrayObservable = transformerObservables
    // ...attach each transformer the index of the observable it came from:        
    .Select((transformerObservable, index) => transformerObservable
        .Select(transformer => Tuple.Create(index, transformer))
        // Then, materialize the transformer sequence so we get noticed when the sequence terminates.
        .Materialize()
        // Now the fun part: Make a scan, resulting in an observable of tuples
        // that have the previous and current transformer
        .Scan(new
        {
            Previous = (Tuple<int, Func<int, int>>)null,
            Current = (Tuple<int, Func<int, int>>)null
        },
        (tuple, currentTransformer) => new
        {
            Previous = tuple.Current,
            Current = currentTransformer.HasValue
                ? currentTransformer.Value
                : (Tuple<int, Func<int, int>>)null
        }))
        // Merge these and do another scan, this time adding and removing
        // the transformers from a list.
        .Merge()
        .Scan(
            new Tuple<int, Func<int, int>>[0],
            (array, tuple) =>
            {
                //Expensive! Consider taking a dependency on immutable collections here!
                var list = array.ToList();

                if (tuple.Previous != null)
                    list.Remove(tuple.Previous);

                if (tuple.Current != null)
                    list.Add(tuple.Current);

                return list.ToArray();
            })
            // Extract only the actual functions
        .Select(x => x.Select(y => y.Item2).ToArray())
        // Finally, to make sure that values are passed even when no transformer has been observed
        // start this sequence with the neutral transformation.
        // IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
        .StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});

Now, you will need an operator that is not available in Rx, called CombineVeryLatest. Have a look here.

var transformedValues = values
    .CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
    {
        return transformers
            .Aggregate(value, (current, transformer) => transformer(current));
    });

And you should be done. There is some performance to be gained I'm sure, but you'll get the idea.

Community
  • 1
  • 1
Daniel C. Weber
  • 1,011
  • 5
  • 12
2

Inspired by this answer I ended up with this:

        Output = Input
            .WithLatestFrom(
                transformations.Transpose(),
                (e, fs) => fs.Aggregate(e, (x, f) => f(x)))
            .SelectMany(x => x)
            .Publish();

where that Transpose and WithLatestFrom operators are defined as:

    public static IObservable<IObservable<T>> Transpose<T>(this IObservable<IObservable<T>> source)
    {
        return Observable.Create<IObservable<T>>(o =>
        {
            var latestValues = new Dictionary<IObservable<T>, T>();
            var result = new BehaviorSubject<IObservable<T>>(Observable.Empty<T>());

            source.Subscribe(observable =>
            {
                observable.Subscribe(t =>
                {
                    latestValues[observable] = t;
                    result.OnNext(latestValues.ToObservable().Select(kv => kv.Value));
                }, () =>
                {
                    latestValues.Remove(observable);
                });
            });

            return result.Subscribe(o);
        });
    }

    public static IObservable<R> WithLatestFrom<T, U, R>(
        this IObservable<T> source,
        IObservable<U> other,
        Func<T, U, R> combine)
    {
        return Observable.Create<R>(o =>
        {
            var current = new BehaviorSubject<U>(default(U));
            other.Subscribe(current);
            return source.Select(s => combine(s, current.Value)).Subscribe(o);
        });
    }

Here's the unit test which checks the behavior:

    [TestMethod]
    public void WithLatestFrom_ShouldNotDuplicateEvents()
    {
        var events = new Subject<int>();

        var add1 = new Subject<Func<int, int>>();
        var add2 = new Subject<Func<int, int>>();
        var transforms = new Subject<IObservable<Func<int, int>>>();

        var results = new List<int>();

        events.WithLatestFrom(
                transforms.Transpose(),
                (e, fs) => fs.Aggregate(e, (x, f) => f(x)))
            .SelectMany(x => x)
            .Subscribe(results.Add);


        events.OnNext(1);
        transforms.OnNext(add1);
        add1.OnNext(x => x + 1);
        events.OnNext(1); // 1+1 = 2
        transforms.OnNext(add2);
        add2.OnNext(x => x + 2);
        events.OnNext(1); // 1+1+2 = 4
        add1.OnCompleted();
        events.OnNext(1); // 1+2 = 3
        add2.OnCompleted();
        events.OnNext(1);

        CollectionAssert.AreEqual(new int[] { 1, 2, 4, 3, 1 }, results);
    }
Community
  • 1
  • 1
Pein
  • 1,216
  • 9
  • 16
  • 2
    I think that if you use `CombineLatest`, `Output` will yield a value when either observable yields a value. Meaning, if `transformations` yields a value in between values from `Input`, then `Output` will yield a value. Based on your marble diagram that does not seem like the desired behavior. – Jason Boyd Apr 27 '15 at 16:19
  • 2
    Another issue that I think I see with `CombineLatest` is that `Output` will not begin yielding values until both, `Input` and `transformations` has yielded their first value. So, if you have a stream of 'products' coming from `Input` but `transformations` has not yielded anything then `Output` will not yield anything. – Jason Boyd Apr 27 '15 at 16:27
  • Thanks for noticing this. I'll update the answer with the fix. – Pein Apr 27 '15 at 19:52
2

Whew, that was a real mind bender, but I think I have something that works. First, I created an extension method to convert IObservable<IObservable<Func<T, T>> into IObservable<IEnumerable<Func<T, T>>. The extension method operates under the assumption that each observable will only yield a single Func<T, T> before completing.

public static class MoreReactiveExtensions
{
    public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
    {
        return
            Observable
            // Yield an empty enumerable first.
            .Repeat(Enumerable.Empty<Func<T, T>>(), 1)
            // Then yield an updated enumerable every time one of 
            // the transformation observables yields a value or completes.
            .Concat(                                    
                source
                .SelectMany((x, i) => 
                    x
                    .Materialize()
                    .Select(y => new 
                        { 
                            Id = i, 
                            Notification = y 
                        }))
                .Scan(
                    new List<Tuple<int, Func<T, T>>>(),
                    (acc, x) => 
                    {
                        switch(x.Notification.Kind)
                        {
                            // If an observable compeleted then remove
                            // its corresponding function from the accumulator.
                            case NotificationKind.OnCompleted:
                                acc = 
                                    acc
                                    .Where(y => y.Item1 != x.Id)
                                    .ToList();
                                break;
                            // If an observable yield a new Func then add
                            // it to the accumulator.
                            case NotificationKind.OnNext:
                                acc = new List<Tuple<int, Func<T, T>>>(acc) 
                                    { 
                                        Tuple.Create(x.Id, x.Notification.Value) 
                                    };
                                break;
                            // Do something with exceptions here.
                            default:
                                // Do something here
                                break;
                        }
                        return acc;
                    })
                // Select an IEnumerable<Func<T, T>> here.
                .Select(x => x.Select(y => y.Item2)));
    }
}

Then, given the following variables:

IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`

I utilized it like this:

var transformations =
    transformationObservables
    .ToTransformations()
    .Publish()
    .RefCount();

IObservable<int> transformedProducts=
    transformations
    .Join(
        products,
        t => transformations,
        i => Observable.Empty<int>(),
        (t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))

The results appear to be correct based on my tests.

Jason Boyd
  • 6,839
  • 4
  • 29
  • 47
0

Does it make sense to represent the Transformers as a stream?

Seeing as adding a new Transformer can only transform future Events, why just not maintain some collection of active transformers, and then when a new Event comes in, you can apply all the current Transformers?

When a Transformer is no longer active it is removed from the collection or flagged as inactive.

Chris
  • 471
  • 3
  • 8