2

I am fairly new to the concept of reactive programming. I am using Bonsai, which exposes some but not all .Net rx commands through c#.

I am trying to get a behavior like this marble diagram:

input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e

Basically, input 2 generates waves of events that should be stored in a queue. Input 1 acts as a trigger to emit single items from this queue.

When the queue is empty, the last item of the queue should be emitted. I tried various combinations of zip and combineLatest but I cannot get the desired behavior.

I also tried an implementation of WithLatestFrom based on this post, but I realize in retrospect this is also not going to produce the desired behavior.

public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
            IObservable<TSource> source,
            IObservable<TOther> other)
        {


            // return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
            return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
        }

Are there any operators or combinations of operators that will produce this behavior? I can do the implementation to Bonsai once I understand which operators to use.

UPDATE 1: 2018/05/18

Based on Sentinel's post, I wrote a new class DiscriminatedUnion inside the Bonsai namespace. I didn't manage to specify the appropriate types though. The compiler states 'type arguments for Merge cannot be inferred' (in .Merge(input1.Select...). Where do I add the correct type specification?

using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;    
namespace Bonsai.Reactive
{
    [Combinator]
   // [XmlType(Namespace = Constants.XmlNamespace)]
    [Description("Implementation of Discriminated Union")]
    public class DiscriminatedUnion
    {
        public IObservable<int?> Process<TInput1, TInput2>(
           IObservable<TInput1> input1,
            IObservable<TInput2> input2)
        {
            var merged =
                        input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
                        .Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
                        .Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
                        {
                            int? next = state.Item1;
                            if (val.Item1 == 1)
                            {
                                if (state.Item2.Count > 0)
                                {
                                    next = state.Item2.Dequeue();
                                }
                            }
                            else
                            {
                                state.Item2.Enqueue(val.Item2);
                            }
                            return Tuple.Create(next, state.Item2, val.Item1);
                        })
                        .Where(x => (x.Item1 != null && x.Item3 == 1))
                        .Select(x => x.Item1);
            return merged;
        }
    }
}
jlarsch
  • 2,217
  • 4
  • 22
  • 44

3 Answers3

2

Here's a testable representation of your problem (or marble diagram), using NuGet package Microsoft.Reactive.Testing:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

which uses this extension method:

public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

The problem is basically a state-machine problem that involves two observables of different types. The best way to solve this is with a Discriminated Union type, which doesn't exist in C#, so we'll create one. @Sentinel's answer did this with a Tuple, and that can work as well:

public class DUnion<T1, T2>
{
    public DUnion(T1 t1) 
    { 
        Type1Item = t1;
        Type2Item = default(T2);
        IsType1 = true;
    }

    public DUnion(T2 t2) 
    { 
        Type2Item = t2;
        Type1Item = default(T1);
        IsType1 = false;
    }

    public bool IsType1 { get; }
    public bool IsType2 => !IsType1;

    public T1 Type1Item { get; }
    public T2 Type2Item { get; }
}

We can then take our two differently-typed streams, Select and Merge them into one discriminated union stream, where we can manage the state with Scan. Your state logic is a bit tricky, but doable:

  • if a number arrives and there's no items in the queue, do nothing
  • if a number arrives and there's items in the queue, emit the first item in the queue.
    • If there's more than one item, remove the recent emmision from the queue.
    • If the queue only has one item, don't remove it, and go into 'fake-empty' state.
  • if a string arrives, stick it in the queue.
    • If the queue is 'fake-empty', eject the last item and exit 'fake-empty' state.

Here's the resulting observable (uses NuGet package System.Collections.Immutable):

var result = input1.Select(i => new DUnion<int, string>(i))
    .Merge(input2.Select(s => new DUnion<int, string>(s)))
    .Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
        ? state.queue.IsEmpty   
            ? (state.queue, null, false, false)     //Is integer, but empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
                ? (state.queue,           state.queue.Peek(), true,  true)
                : (state.queue.Dequeue(), state.queue.Peek(), false, true)
        : state.isFakeEmptyState //Is new string, just add to queue, don't emit
            ? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false) 
            : (state.queue.Enqueue(dItem.Type2Item),   (string)null, false, false) 
    )
    .Where(t => t.emit)
    .Select(t => t.item);

This can then be tested as follows:

var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.

Update: I thought about this a bit, and I think it makes sense to throw some operators around the Discriminated Union functionality. This way you don't have to explicitly deal with the type:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2)
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
        {
            return source.Scan(initialState, (state, u) => u.IsType1
                ? type1Handler(state, u.Type1Item)
                : type2Handler(state, u.Type2Item)
            );
        }
}

With those extension methods, the solution changes to this, which I think reads better:

var result = input1
    .Union(input2)
    .ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), 
        (state, _) => state.queue.IsEmpty
            ? (state.queue, null, false, false)     //empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : (state.queue.Dequeue(), state.queue.Peek(), false, true),
        (state, s) => state.isFakeEmptyState 
            ? (state.queue.Dequeue().Enqueue(s), null, false, false)
            : (state.queue.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.emit)
    .Select(t => t.item); 

If you're having trouble with the named Tuple syntax, then you can use the old tuples:

var result = input1
    .Union(input2)
    .ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
        (state, _) => state.Item1.IsEmpty
            ? Tuple.Create(state.Item1, (string)null, false, false)     //empty queue, so don't emit item
            : state.Item1.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
        (state, s) => state.Item3
            ? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
            : Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.Item4)
    .Select(t => t.Item2);
Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • I like how you explain it clearly using terms like Discriminated Union, which are new to me. Could you point me to some learning reference for this kind of thing, as I feel i approach these problems intuitively and not quite knowing if there are better alternatives? – Sentinel May 18 '18 at 05:12
  • Also....What/why is ImmutableQueue? I think I may have seen this class already, and wondered about it last time – Sentinel May 18 '18 at 05:15
  • Thanks a lot. I have two issues adapting this for my `IObservable`: 1.) The types of my inputs are `TSource` and `TOther`, and not guaranteed `int` and `string`. 2.) My compiler doesn't know `queue`. On the line `.Scan((queue: ImmutableQueue.Empty, ...` I get errors that `queue` doesn't exist in the current context. and it expects `;}` after `.Empty`. Do I need another package besides `System.Collections.Immutable`? – jlarsch May 18 '18 at 08:07
  • Seems like I am stumbling over the named argument syntax? – jlarsch May 18 '18 at 11:15
  • Updated answer to use un-named tuples. @jlarsch: You can replace `int` and `string` with `TSource` and `TOther`. There's nothing in those answers that uses int or string qualities, except for null (just replace with `default(TSource)` or whatever. – Shlomo May 18 '18 at 13:21
  • @Sentinel: Here's a quick demo of ImmutableQueue: https://gist.github.com/ShlomoAbraham/2111fa2a61cec26a49e4d6f62793df93. ImmutableQueue is an immutable data structure, which works much better with Rx. Your answer uses mutable data structures, and I believe avoids all the pitfalls, but it is easy to trip up. – Shlomo May 18 '18 at 14:02
  • As for terms like DiscriminatedUnion... experiment with other languages, particularly functional ones. They tend to guide you towards other techniques. I also highly recommend this book: https://www.amazon.com/Real-World-Functional-Programming-Tomas-Petricek/dp/1933988924 – Shlomo May 18 '18 at 14:03
  • @Shlomo Thanks for that. I am inexperienced in functional programming, apart from insofar as Linq/Rx is functional, but from a conceptual perspective for a long time I have increasingly leant towards functional models of the world. I think there is something inherently misleading about describing the world as static structures as in OO, using concepts *is* or *type of* tend to support static models, whereas I increasingly see things like *properties* as rather *behaviours* conditional on both observed and observer. – Sentinel May 18 '18 at 14:19
  • @Shlomo On this particular problem, I just had another idea for an approach on how to solve it. I will try it in a scratch pad and post it up here, would be curious for your perspective on it (if it works) – Sentinel May 18 '18 at 14:20
  • I replaced the null strings with default(objects) in the version with old tuples and it works like a charm! Will do some testing – jlarsch May 18 '18 at 16:11
1

Would this do the trick? There is a probably a better way to do this buffers so it might be worth revisiting this.

        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));



        var merged =
            source2.Select(s2 => Tuple.Create(2, s2))
            .Merge(source1.Select(s1 => Tuple.Create(1, (int)s1)))
            .Scan(Tuple.Create((int?)null, new Queue<int>(),0), (state, val) =>
                 {
                     int? next = state.Item1;
                     if (val.Item1 == 1)
                     {
                         if (state.Item2.Count > 0)
                         {
                             next = state.Item2.Dequeue();
                         }
                     }
                     else
                     {
                         state.Item2.Enqueue(val.Item2);

                     }
                     return Tuple.Create(next, state.Item2,val.Item1);
                 })
            .Where(x=>(x.Item1!=null && x.Item3==1))
            .Select(x => x.Item1);



        merged.Subscribe(x => Console.WriteLine("Merged "+x));

UPDATE Fixed code for OP:

 public class DiscriminatedUnion
{
    public static IObservable<TInput2> Process<TInput1, TInput2>(
       IObservable<TInput1> input1,
        IObservable<TInput2> input2)
    {
        var merged =
                    input2.Select(s2 => Tuple.Create(2, (object)s2))
                    .Merge(input1.Select(s1 => Tuple.Create(1, (object)s1)))
                    .Scan(Tuple.Create(default(TInput2), new Queue<TInput2>(), 0), (state, val) =>
                    {
                        TInput2 next = state.Item1;
                        if (val.Item1 == 1)
                        {
                            if (state.Item2.Count > 0)
                            {
                                next = state.Item2.Dequeue();
                            }
                        }
                        else
                        {
                            state.Item2.Enqueue((TInput2)val.Item2);
                        }
                        return Tuple.Create(next, state.Item2, val.Item1);
                    })
                    .Where(x => (!x.Item1.Equals(default(TInput2)) && x.Item3 == 1))
                    .Select(x => x.Item1);
        return merged;
    }
}
Sentinel
  • 3,582
  • 1
  • 30
  • 44
  • thanks! Being new to c#, I am struggling to write a class of the `public IObservable` type described in my question using your suggestion. I will try and test. – jlarsch May 17 '18 at 22:46
  • Beat me to the punch! I just wrote up something very similar. – Shlomo May 17 '18 at 22:53
  • @jlarsch just return merged, which is IObservable. To test, add this to Main in console app. – Sentinel May 18 '18 at 05:09
  • I tried to adapt your code using the types of my `public IObservable`: `input2.Select(s2 => Tuple.Create(2, (TInput2)s2))` `.Merge(input1.Select(s1 => Tuple.Create((int)1, (TInput1)s1)))` But my compiler complains about merge nor being able to infer type arguments – jlarsch May 18 '18 at 07:55
  • @jlarsch Away from PC for a few hrs. Please update your question to include the adapted code and I will correct it for you. The code above does work, you just need to change the types. – Sentinel May 18 '18 at 10:24
  • @jlarsch Fixed your code. Note that the problem was that the tuples were of different types so they could not be merged. The key is that you are only interested in TInput2 type, so I just cast both to *object*. You will want to use nullable types , eg : *int?* to avoid 0 being filtered out (default) – Sentinel May 18 '18 at 14:14
  • @jlarsch see alt answer and comments below for some perhaps more intuitive approaches – Sentinel May 18 '18 at 20:19
1

I love these Rx puzzles. Can't believe one get paid to do this. So I came up with a slightly different approach. I think there are some weaknesses with race conditions here, but I would be curious what you think and how these can be eliminated.

The basic idea is to think of the queue as a recursive buffer-until over source1, where the buffer is replayed into the queue sans first element.

UPDATE

Based on shlomo's observation that publish().refcount() is needed, I updated the code and turned the solution into an extension "RegulatedQueue". Please see the below code. Input2 is the source to regulate via a queue, Input1 is the regulating signal.

public static class RxHelpers
{
    public static IObservable<TInput2> RegulatedQueue<TInput1, TInput2>(this IObservable<TInput2> input2,
       IObservable<TInput1> input1
        )
    {
        return Observable.Using(() => new Subject<TInput2>(),
        queue =>
        {
            input2.Subscribe(queue);
            return queue
                .Buffer(() => input1)
                .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
                .Where(l => l.Count > 0)
                .Select(l => l.First()).
                Publish().
                RefCount();
        });
    }
}


class Program
{


    static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(2000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        var merged = source2.RegulatedQueue(source1);

        merged.Subscribe(x => Console.WriteLine("Merged1 " + x));
        merged.Subscribe(x => Console.WriteLine("Merged2 " + x));






        Console.ReadKey();

    }
}

OBSOLETE

  static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        //THIS BIT
         Subject<int> queue = new Subject<int>();
        source2.Subscribe(queue);
        var merged=queue
            .Buffer(() => source1)
            .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
            .Where(l=>l.Count > 0)
            .Select(l => l.First());





            merged.Subscribe(x => Console.WriteLine("Merged "+x));







        Console.ReadKey();

    }

Testcode:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

Subject<string> queue = new Subject<string>();
input2.Subscribe(queue);
var result = queue
    .Buffer(() => input1)
    .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
    .Where(l => l.Count > 0)
    .Select(l => l[0]);

result.Timestamp(scheduler)
    .Select(t => $"{t.Timestamp.Ticks} ticks: {t.Value}")
    .Dump(); //Linqpad

expected output:

//14000000 enqueue a
//15000000 enqueue b
//16000000 enqueue c
20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: c 
//55000000 enqueue d
//56000000 enqueue e
//57000000 enqueue f
60000000 ticks: c //should really be d, but there's no handling for fake-empty ejection
70000000 ticks: d 
80000000 ticks: e 
90000000 ticks: f 
100000000 ticks: f 
110000000 ticks: f 
120000000 ticks: f 
130000000 ticks: f 
140000000 ticks: f 
...

actual output:

20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: b 
60000000 ticks: c 
70000000 ticks: b 
80000000 ticks: c 
90000000 ticks: c 
100000000 ticks: b 
110000000 ticks: c 
120000000 ticks: c 
130000000 ticks: b 
140000000 ticks: c 
150000000 ticks: b 
160000000 ticks: c 
170000000 ticks: b 
180000000 ticks: c 
190000000 ticks: c 
Sentinel
  • 3,582
  • 1
  • 30
  • 44
  • It fails with the test code (keeps emitting b-c), I can't figure out why though. I would suggest using testable values instead of Random. You can fool yourself easily with that. – Shlomo May 18 '18 at 15:56
  • 1
    Added edits with testcode. The approach should work, and reminds me a lot of an answer I turn back to often: https://stackoverflow.com/questions/4123178/a-way-to-push-buffered-events-in-even-intervals – Shlomo May 18 '18 at 16:01
  • @Shlomo OK I have to get off now, will check it later. It worked with the code I had, but should be susceptible to thread related errors I think in its basic form. – Sentinel May 18 '18 at 16:15
  • oh hah. It was a multiple subscriber problem: Multiple subscribers would enqueue, causing duplicate queueing. Single subscriber works fine. Add `.Publish().RefCount()` and multi-subscriber works. – Shlomo May 18 '18 at 16:21
  • @shlomo ah gotcha I see. Still, think it would need some looking into from a synchronization perspective. OP could use this as an easy to read toy noddy solution for now. – Sentinel May 18 '18 at 20:17
  • I am super happy about all your suggestions. I am currently running a real-life test using shlomo's version. It injects 1 million items into a queue every 24 hours and consumes 30 items per second (this is part of a neuroscience behavior experiment). This last short version looks appealing for brevity. Thanks again! – jlarsch May 18 '18 at 20:28
  • @jlarsch Neuroscience? Hire me! I want to get involved! – Sentinel May 18 '18 at 20:31
  • @jlarsch Code updated into a usable extension "RegulatedQueue". Again, recommend testing in different scenarios as this is all quite off-the-cuff but should be OK. – Sentinel May 19 '18 at 09:36