I'm new to reactive extensions (rx) and try to do the following thing in .NET (but should be the same in JS and other languages.):
I have a stream incoming with objects containing a string and a bool property. The stream would be infinite. I have the following conditions:
- The first object should always be printed.
- Now all incoming objects should be skipped until an object arrives with the bool property set to "true".
- When an object arrives with the bool property set to "true", this object should be skipped but the next object should be printed (no matter what the properties are).
- Now it goes on this way, every object that follows an object with the property set to true should be printed.
Example:
("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)
Expected result:
"one"--"three"--"six"--"seven"--"eight"--"ten"
Beware that "six" and "seven" have been printed because they follow an object with the property set to true, even if their own property is also set to "true".
Simple .NET program to test it:
using System;
using System.Reactive.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
class Result
{
public bool Flag { get; set; }
public string Text { get; set; }
}
static void Main(string[] args)
{
var source =
Observable.Create<Result>(f =>
{
f.OnNext(new Result() { Text = "one", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "two", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "three", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "four", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "five", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "six", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "seven", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "eight", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "nine", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "ten", Flag = false });
return () => Console.WriteLine("Observer has unsubscribed");
});
}
}
}
I tried to use the .Scan
and .Buffer
extensions but I don't know how exactly to use them in my scenario.
Performance of course should be as good as possible, cause in the end the stream would be infinite.