2

I'm new to reactive extensions (rx) and try to do the following thing in .NET (but should be the same in JS and other languages.):

I have a stream incoming with objects containing a string and a bool property. The stream would be infinite. I have the following conditions:

  • The first object should always be printed.
  • Now all incoming objects should be skipped until an object arrives with the bool property set to "true".
  • When an object arrives with the bool property set to "true", this object should be skipped but the next object should be printed (no matter what the properties are).
  • Now it goes on this way, every object that follows an object with the property set to true should be printed.

Example:

("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)

Expected result:

"one"--"three"--"six"--"seven"--"eight"--"ten"

Beware that "six" and "seven" have been printed because they follow an object with the property set to true, even if their own property is also set to "true".

Simple .NET program to test it:

using System;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        class Result
        {
            public bool Flag { get; set; }
            public string Text { get; set; }
        }

        static void Main(string[] args)
        {               
            var source =
               Observable.Create<Result>(f =>
               {
                   f.OnNext(new Result() { Text = "one", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "two", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "three", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "four", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "five", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "six", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "seven", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "eight", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "nine", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "ten", Flag = false });

                   return () => Console.WriteLine("Observer has unsubscribed");
               });
        }
    }
}

I tried to use the .Scan and .Buffer extensions but I don't know how exactly to use them in my scenario.

Performance of course should be as good as possible, cause in the end the stream would be infinite.

Uwe Keim
  • 39,551
  • 56
  • 175
  • 291
Tobias von Falkenhayn
  • 1,355
  • 5
  • 26
  • 59

4 Answers4

3

Try this approach:

var results = new[]
{
    new Result() { Text = "one", Flag = false },
    new Result() { Text = "two", Flag = true },
    new Result() { Text = "three", Flag = false },
    new Result() { Text = "four", Flag = false },
    new Result() { Text = "five", Flag = true },
    new Result() { Text = "six", Flag = true },
    new Result() { Text = "seven", Flag = true },
    new Result() { Text = "eight", Flag = false },
    new Result() { Text = "nine", Flag = true },
    new Result() { Text = "ten", Flag = false },
};

IObservable<Result> source =
    Observable
        .Generate(
            0, x => x < results.Length, x => x + 1,
            x => results[x],
            x => TimeSpan.FromSeconds(1.0));

The above just produces the source in a more idiomatic way than your Observable.Create<Result> approach.

Now here's the query:

IObservable<Result> query =
    source
        .StartWith(new Result() { Flag = true })
        .Publish(ss =>
            ss
                .Skip(1)
                .Zip(ss, (s1, s0) =>
                    s0.Flag
                    ? Observable.Return(s1) 
                    : Observable.Empty<Result>())
                .Merge());

The use of .Publish here allows the source observable to have only one subscription, but for it to be used multiple times within the .Publish method. Then the standard Skip(1).Zip approach can be used to inspect the subsequent values being produced.

Here's the output:

output


After inspiration from Shlomo, here's my approach using .Buffer(2, 1):

IObservable<Result> query2 =
    source
        .StartWith(new Result() { Flag = true })
        .Buffer(2, 1)
        .Where(rs => rs.First().Flag)
        .SelectMany(rs => rs.Skip(1));
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • worked like a charm, thanks! can you maybe explain a little further why you used publish, since its a little confusing to me. :) – Tobias von Falkenhayn Aug 21 '18 at 12:07
  • @TobiasvonFalkenhayn - Is there something in particular about the description of the use of `.Publish` in my answer that you're unsure of? – Enigmativity Aug 21 '18 at 12:20
  • I just don't get it why its used, maybe because I'm new to Rx. Every documentation says that publish / connect is used when subscription happens after emitting values has started. So why exactly do you use publish and not simply: var query = source.StartWith(new Result() { Flag = true }); query.Skip(1).Zip(query, (s1, s0) => s0.Flag ? Observable.Return(s1) : Observable.Empty()).Merge().Subscribe(f => { Console.WriteLine(f.Text); }); – Tobias von Falkenhayn Aug 21 '18 at 14:45
  • 2
    Add `Console.Writeline("Nasty Side-Effect.");` to the beginning of your source observable. With `Publish`, you'll see that message once. Without it, you'll see it twice. – Shlomo Aug 21 '18 at 14:50
  • 1
    Each time you say `source.` a new subscription is created, which executes the entire observable again from scratch. `Publish` is like a router which 'copies' the notifications it receives and passes on multiple copies. – Shlomo Aug 21 '18 at 15:10
  • Alright, I think I'll get it. So every operator is like a "subscription" in your terminology, e.g. source.merge or source.buffer are both "subscriptions". I think i messed it a little up in my brain cause for me, subscriptions had to only with source.subscribe. – Tobias von Falkenhayn Aug 21 '18 at 15:14
  • Yes, everything is a subscription. `IObservable` has one method: `Subscribe`. So each of the operators are using that. – Shlomo Aug 21 '18 at 15:21
  • 1
    @TobiasvonFalkenhayn - You're a little bit off track. A new subscription is **not** created when you use a new operator. It's when you use the `source`. Generally you have separate subscription when you use a separate reference to the `source`. The use of `x.Publish(y => ...)` changes the rule so that you can use the `y` as many times as you like with only one subscription on the `x`. – Enigmativity Aug 22 '18 at 01:39
  • @Enigmativity thanks, for better discussion I opened a new thread: https://stackoverflow.com/questions/51961704/rx-what-are-subscriptions-and-how-do-subscriptions-work feel free to participate :) – Tobias von Falkenhayn Aug 22 '18 at 07:23
2

Here's a number of ways to do it:

var result1 = source.Publish(_source => _source
    .Zip(_source.Skip(1), (older, newer) => (older, newer))
    .Where(t => t.older.Flag == true)
    .Select(t => t.newer)
    .Merge(_source.Take(1))
    .Select(r => r.Text)
);

var result2 = source.Publish(_source => _source
    .Buffer(2, 1)
    .Where(l => l[0].Flag == true)
    .Select(l => l[1])
    .Merge(_source.Take(1))
    .Select(l => l.Text)
);

var result3 = source.Publish(_source => _source
    .Window(2, 1)
    .SelectMany(w => w
        .TakeWhile((r, index) => (index == 0 && r.Flag) || index == 1)
        .Skip(1)
    )
    .Merge(_source.Take(1))
    .Select(l => l.Text)
);

var result4 = source
    .Scan((result: new Result {Flag = true, Text = null}, emit: false), (state, r) => (r, state.result.Flag))
    .Where(t => t.emit)
    .Select(t => t.result.Text);

I'm partial to the Scan one, but really, up to you.

Shlomo
  • 14,102
  • 3
  • 28
  • 43
  • I like the `.Buffer(2, 1)` one myself. I'd remove the `.Merge(_source.Take(1))` and include a `.StartWith(new Result() { Flag = true })` personally, but hey, a good number of choices. – Enigmativity Aug 21 '18 at 11:58
  • thanks for your help. :) Can you provide me a link or something why you use "publish", I don't get how exactly this works with the selector function. – Tobias von Falkenhayn Aug 21 '18 at 14:47
0

I've found a way to do it thanks to the answer of @Picci:

Func<bool, Action<Result>> printItem = print =>
                {
                    return data => {
                        if(print)
                        {
                            Console.WriteLine(data.Text);
                        }
                    };
                };

var printItemFunction = printItem(true);

source.Do(item => printItemFunction(item))
      .Do(item => printItemFunction = printItem(item.Flag))
      .Subscribe();

However, I'm not quite sure if this is the best way since it seems a little strange to me not to use Subscribe() but side-effects. In the end i don't only want to print the values but also call a Webservice with it.

Tobias von Falkenhayn
  • 1,355
  • 5
  • 26
  • 59
-1

This is the way I would code it in TypeScript

const printItem = (print: boolean) => {
    return (data) => {
        if (print) {
            console.log(data);
        }
    };
}

let printItemFunction = printItem(true);

from(data)
.pipe(
    tap(item => printItemFunction(item.data)),
    tap(item => printItemFunction = printItem(item.printBool))
)
.subscribe()

The basic idea is to use an higher level function, printItem, which returns a function that knows if and what to print. The function returned is stored in a variable, printItemFunction.

For each item emitted by the source Observable, the first thing to be done is to execute printItemFunction passing the data notified by the source Observable.

The second thing is to evaluate printItem function and store the result in the variable printItemFunction so that it is ready for the following notification.

At the beginning of the program, printItemFunction is initialized with true, so that the first item is always printed.

I am not familiar with C# to give you an answer for .NET

Picci
  • 16,775
  • 13
  • 70
  • 113
  • Thank you very much. I managed to do the same thing in .NET. See my answer above. However, I'm not quite sure if this is the best way since it seems a little strange to me not to use Subscribe() but side-effects. In the end i don't only want to print the values but also call a Webservice with it. – Tobias von Falkenhayn Aug 21 '18 at 11:26
  • Once the key elements of the solution are clear, you can mix side effects and subscribe, which among other things is another way to define side effects, as you like. I used them in code in an attempt to make clear the key steps. – Picci Aug 21 '18 at 11:37
  • @Picci - I'd suggest avoiding side-effects where ever possible. – Enigmativity Aug 21 '18 at 11:39
  • I guess that printing on the console is a side effect, isn’t it? – Picci Aug 21 '18 at 11:47
  • @Picci - And you should try having two current subscribers. It'd cause a race condition. – Enigmativity Aug 21 '18 at 11:52