6

I have a event that I'm not in control of which provides me with data. The eventArgs looks something like this:

class MyEventArg {
  bool IsLastItem {get;}
  Data DataItem {get;}
}

I use Rx to convert this event to an IObservable. But I want to complete the observable if IsLastItem is true.

Any elegant ideas? One way would be to pipe the data through a subject that I have more control over to set the OnComplete event if the condition occurs...

lukebuehler
  • 4,061
  • 2
  • 24
  • 28

3 Answers3

9

If you want the last element to be included you can merge a stream with only the last element together with the regular stream combined with TakeWhile. Here is a simple console app to prove it:

var subject = new List<string>
{                            
"test",
"last"
}.ToObservable();

var my = subject
            .Where(x => x == "last").Take(1)
            .Merge(subject.TakeWhile(x => x != "last"));

my.Subscribe(
    o => Console.WriteLine("On Next: " + o), 
    () => Console.WriteLine("Completed"));

Console.ReadLine();

This prints:

On Next: test
On Next: last
Completed

UPDATE There was a bug that supressed the OnCompleted message if the underlying Observable didn't actually complete. I corrected the code to ensure OnCompleted gets called

And if you want to avoid subscribing to the underlying sequence multiple times for cold observables you can refactor the code like this:

var my = subject.Publish(p => p
            .Where(x => x == "last").Take(1)
            .Merge(p.TakeWhile(x => x != "last")));
Christoph
  • 26,519
  • 28
  • 95
  • 133
  • Nice! Took me a few seconds to see how it's done. Maybe better write: subject.TakeWhile(x => x != "last").Merge(subject.Where(x => x == "last").Take(1)); The cold observable tip is a +. Thanks – lukebuehler Nov 14 '11 at 21:00
3
public static IObservable<TSource> TakeWhileInclusive<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
                                                   {
                                                       o.OnNext(x);
                                                       if (!predicate(x))
                                                           o.OnCompleted();
                                                   },
                                               o.OnError,
                                               o.OnCompleted
                                  ));
}
Sergey Aldoukhov
  • 22,316
  • 18
  • 72
  • 99
2

Are you looking for something like this?

IObservable<MyEventArg> result =
    myEventArgObservable.TakeWhile(arg => !arg.IsLastItem);
dtb
  • 213,145
  • 36
  • 401
  • 431
  • Wow, that would simple and nice. Do you know if the Observable completes if the predicate is true? – lukebuehler Nov 14 '11 at 18:51
  • Yes, it will notify OnCompleted() if you don't want it to notify OnCompleted you can simply use Where(arg => !arg.IsLastItem) – Christoph Nov 14 '11 at 18:56
  • 2
    Yes, it seems like the Observable completes, BUT now I have the problem that I won't receive the last item... – lukebuehler Nov 14 '11 at 18:56