Assuming your transformers work on int
and your observables are named like this:
IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;
I would first convert the Observable of Observable of transformers into an Observable of Array of Transformers, that is
IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>
First of all, we ultimately will want to add and remove functions into and from lists, and to make sure the right transformer is removed, we have to override the usual comparison mechanism on Func<...>. So we...
var transformerArrayObservable = transformerObservables
// ...attach each transformer the index of the observable it came from:
.Select((transformerObservable, index) => transformerObservable
.Select(transformer => Tuple.Create(index, transformer))
// Then, materialize the transformer sequence so we get noticed when the sequence terminates.
.Materialize()
// Now the fun part: Make a scan, resulting in an observable of tuples
// that have the previous and current transformer
.Scan(new
{
Previous = (Tuple<int, Func<int, int>>)null,
Current = (Tuple<int, Func<int, int>>)null
},
(tuple, currentTransformer) => new
{
Previous = tuple.Current,
Current = currentTransformer.HasValue
? currentTransformer.Value
: (Tuple<int, Func<int, int>>)null
}))
// Merge these and do another scan, this time adding and removing
// the transformers from a list.
.Merge()
.Scan(
new Tuple<int, Func<int, int>>[0],
(array, tuple) =>
{
//Expensive! Consider taking a dependency on immutable collections here!
var list = array.ToList();
if (tuple.Previous != null)
list.Remove(tuple.Previous);
if (tuple.Current != null)
list.Add(tuple.Current);
return list.ToArray();
})
// Extract only the actual functions
.Select(x => x.Select(y => y.Item2).ToArray())
// Finally, to make sure that values are passed even when no transformer has been observed
// start this sequence with the neutral transformation.
// IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
.StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});
Now, you will need an operator that is not available in Rx, called CombineVeryLatest. Have a look here.
var transformedValues = values
.CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
{
return transformers
.Aggregate(value, (current, transformer) => transformer(current));
});
And you should be done. There is some performance to be gained I'm sure, but you'll get the idea.