3

I have a sequence of true false values like so

        var alternatingTrueFalse = Observable.Generate(
            true,
            _ => true,
            x => !x,
            x => x,
            _ => TimeSpan.FromMilliseconds(new Random().Next(2000)))
            .Take(20).Publish();
        alternatingTrueFalse.Connect();
        var buffered = alternatingTrueFalse
            .Buffer(TimeSpan.FromMilliseconds(500))
            .Subscribe(x => Console.WriteLine($"{TimeStamp} {string.Join(",", x)}"));

I want to look at the sequence in terms of 500 ms (max) windows / buffers. If there is only one true value (and nothing else) inside one such window, I want to flip a switch (just call a command, print to console for now). Then, when the next false value arrives, I want to flip the switch back and close the current window / buffer of the original sequence and start a new one.

Using Buffer + Scan to flip the switch

So far I've come up with a way to do this on a Buffer, with this. However, the buffers are open too long, they are always 500 ms.

        var buffered = alternatingTrueFalse
            .Buffer(TimeSpan.FromMilliseconds(500));
        var output = buffered
            .Subscribe(x => Console.WriteLine($"{TimeStamp} {string.Join(",", x)}"));

        var isFlipped = buffered.Scan(false, 
                (x, y) => 
                { 
                    if (y.Count == 0)
                    {
                        return x;
                    }
                    return y.Count == 1 && y.First();
                });

        isFlipped.DumpTimes("Flipped");

I'm trying to figure out how to use Window instead of Buffer to be able to flip the switch back on the first false after an isolated true. But I can't seem to get it right, I'm not quite fluent in Rx yet and not sure how to use the windowOpening / Closing values for it.

Example output

original

2017-10-07 20:21:39.302 True,False   // Rapid values should not flip the switch (actually they should flip a different switch)
2017-10-07 20:21:39.797 True         // Flip the switch here
2017-10-07 20:21:40.302 False        // Flip the switch back and close the current window
2017-10-07 20:21:40.797 True         // Flip the switch here
2017-10-07 20:21:41.297 
2017-10-07 20:21:41.798 False        // Etc...
...
2017-10-07 20:21:43.297 True
2017-10-07 20:21:43.800 False,True   // Example of a window that is open too long, because it is not closed immediately upon the false value
...

buffer + scan

2017-10-07 20:47:15.154 True
2017-10-07 20:47:15.163 - Flipped-->True
2017-10-07 20:47:15.659 False,True   // Example of a window open too long
2017-10-07 20:47:15.661 - Flipped-->False
gakera
  • 3,589
  • 4
  • 30
  • 36

2 Answers2

2

Here's a solution not using the Scan approach.

The issue seems to be closing the buffer based on two conditions - maximum time or specific value. This one is based on an old answer

public static IObservable<IList<TSource>> BufferWithClosingValue<TSource>(
    this IObservable<TSource> source, 
    TimeSpan maxTime, 
    TSource closingValue)
{
    return source.GroupByUntil(_ => true,
                               g => g.Where(i => i.Equals(closingValue)).Select(_ => Unit.Default)
                                     .Merge(Observable.Timer(maxTime).Select(_ => Unit.Default)))
                 .SelectMany(i => i.ToList());
}

Example usage would be

alternatingTrueFalse.BufferWithClosingValue( TimeSpan.FromMilliseconds(500), false );
supertopi
  • 3,469
  • 26
  • 38
  • Thanks, that seems to give me a pretty good building block to continue with. I'd still like to learn more about window and scan, but groupbyuntill also seems good. – gakera Oct 08 '17 at 15:54
  • 1
    Just FYI I used this in a bit more interesting scenario here: https://stackoverflow.com/questions/46492665/how-to-use-reactive-ui-to-trigger-a-different-action-following-a-button-click-vs – gakera Oct 08 '17 at 17:17
  • Okay, but the issue remains the same - close buffer/window based on multiple conditions. I'm not sure what you can accomplish with `Scan` because you can't control the buffer/window closing. Maybe you can apply it directly on source observable and build from there – supertopi Oct 09 '17 at 12:40
1

I would recommend to create custom operator not based on other operator as it will take much more CPU and memory.

Here is the clean version of the same method.

public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
{
    var buffer = new List<TValue>();

    return Observable.Create<IEnumerable<TValue>>(observer =>
    {
        var aTimer = new Timer();
        void Clear()
        {
            aTimer.Stop();
            buffer.Clear();
        }
        void OnNext()
        {
            observer.OnNext(buffer);
            Clear();
        }
        aTimer.Interval = threshold.TotalMilliseconds;
        aTimer.Enabled = true;
        aTimer.Elapsed += (sender, args) => OnNext();
        var subscription = @this.Subscribe(value =>
        {
            buffer.Add(value);
            if (buffer.Count >= maxAmount)
                OnNext();
            else
            {
                aTimer.Stop();
                aTimer.Start();
            }
        });
        return Disposable.Create(() =>
        {
            Clear();
            subscription.Dispose();
        });
    });
}
shtse8
  • 1,092
  • 12
  • 20