I am fairly new to the concept of reactive programming. I am using Bonsai, which exposes some but not all .Net rx commands through c#.
I am trying to get a behavior like this marble diagram:
input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e
Basically, input 2 generates waves of events that should be stored in a queue. Input 1 acts as a trigger to emit single items from this queue.
When the queue is empty, the last item of the queue should be emitted. I tried various combinations of zip and combineLatest but I cannot get the desired behavior.
I also tried an implementation of WithLatestFrom
based on this post, but I realize in retrospect this is also not going to produce the desired behavior.
public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
IObservable<TSource> source,
IObservable<TOther> other)
{
// return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
}
Are there any operators or combinations of operators that will produce this behavior? I can do the implementation to Bonsai once I understand which operators to use.
UPDATE 1: 2018/05/18
Based on Sentinel's post, I wrote a new class DiscriminatedUnion
inside the Bonsai namespace. I didn't manage to specify the appropriate types though. The compiler states 'type arguments for Merge
cannot be inferred' (in .Merge(input1.Select...
).
Where do I add the correct type specification?
using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;
namespace Bonsai.Reactive
{
[Combinator]
// [XmlType(Namespace = Constants.XmlNamespace)]
[Description("Implementation of Discriminated Union")]
public class DiscriminatedUnion
{
public IObservable<int?> Process<TInput1, TInput2>(
IObservable<TInput1> input1,
IObservable<TInput2> input2)
{
var merged =
input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
.Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
.Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
{
int? next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue(val.Item2);
}
return Tuple.Create(next, state.Item2, val.Item1);
})
.Where(x => (x.Item1 != null && x.Item3 == 1))
.Select(x => x.Item1);
return merged;
}
}
}