8

Observable.TakeWhile allows you to run a sequence as long as a condition is true (using a delegate so we can perform computations on the actual sequence objects), but it's checking this condition BEFORE each element. How can I perform the same check but AFTER each element?

The following code demonstrates the problem

    void RunIt()
    {
        List<SomeCommand> listOfCommands = new List<SomeCommand>();
        listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });

        var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);

        obs.Subscribe(x =>
        {
            Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
        });
    }

    class SomeCommand
    {
        public int CurrentIndex;
        public int TotalCount;
    }

This outputs

1 of 3
2 of 3

I can't get the third element

Looking at this example, you may think all I have to do is change my condition like so -

var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);

But then the observable will never complete (because in my real world code, the stream doesn't end after those three commands)

NoPyGod
  • 4,905
  • 3
  • 44
  • 72

6 Answers6

15

There's no built in operators to do what you're asking, but here's one that uses Publish to run two queries while only subscribing to the underlying observable once:

// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
    this IObservable<T> source, Func<T, bool> predicate)
{
    return source.Publish(co => co.TakeWhile(predicate)
        .Merge(co.SkipWhile(predicate).Take(1)));
}

And then:

var obs = listOfCommands.ToObservable()
    .TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
Richard Szalay
  • 83,269
  • 19
  • 178
  • 237
  • Yeah, yours is still way better: single merging subscription – JerKimball Feb 05 '13 at 05:54
  • 2
    I think the usage can read better when this is called TakeUntil instead of TakeWhileInclusive. Eg Status.TakeUntil(s => s == Status.Completed) reads better than Status.TakeWhileInclusive(s => s != Status.Completed). – Niall Connaughton Mar 06 '14 at 03:38
6

Final edit:

I based my solution off of Sergey's TakeWhileInclusive implementation in this thread - How to complete a Rx Observable depending on a condition in a event

public static IObservable<TSource> TakeUntil<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
        {
            o.OnNext(x);
            if (predicate(x))
                o.OnCompleted();
        },
        o.OnError,
        o.OnCompleted
    ));
}
Community
  • 1
  • 1
NoPyGod
  • 4,905
  • 3
  • 44
  • 72
  • FYI, you are never unsubscribing from `source`. If it doesn't naturally complete by itself you may have a memory leak on your hands. – Richard Szalay Feb 12 '13 at 22:35
  • Any suggestions on how I can do that? – NoPyGod Feb 12 '13 at 22:38
  • I'd create a `SerialDisposable` and assign the `Subscribe` return to it's `Disposable` property. Then on Complete/Error, dispose the `SerialDisposable` instance. Out of interest, why was the Publish-based implementation not suitable? – Richard Szalay Feb 12 '13 at 23:05
  • If I remember correctly it was the same reason I couldn't use Alex G's answer. When further reducing it drops the last element -- which is the one we're trying to get. (See my comment in response to Alex G's answer above) – NoPyGod Feb 12 '13 at 23:53
  • Not sure why that would be. I just repro'd your problem and using my `TakeWhileInclusive` as a drop-in replacement returned all three results as expected. – Richard Szalay Feb 13 '13 at 00:40
  • Did you try reducing AFTER TakeWhileInclusive with Timeout? – NoPyGod Feb 13 '13 at 01:51
  • By reducing do you mean Aggregate / Sum? If so, yes, if I add `.Sum(c => c.TotalCount)` before subscribing, the output is a single value of `9`. – Richard Szalay Feb 13 '13 at 02:35
  • Try adding Timeout after TakeWhileInclusive – NoPyGod Feb 13 '13 at 03:46
  • The subscription is returned (the Create takes a Func, IDisposable>), then completing/erroring the sequence will invoke the disposal too. – Lee Campbell Nov 16 '14 at 15:48
2

You can use the TakeUntil operator to take every item until a secondary source produces a value; in this case we can specify the second stream to be the first value after the predicate passes:

public static IObservable<TSource> TakeWhileInclusive<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1));
}
Alex
  • 7,639
  • 3
  • 45
  • 58
  • It doesn't work when you subscribe to TakeWhileInclusive() and then perform further reducing. ie.. TakeWhileInclusive().Timeout().. try it, it drops the last element – NoPyGod Feb 05 '13 at 13:14
  • Apologies, I made an edit that changed the code because of a fault in my testing - make sure the `.Skip(1)` operation is present! I have now tested with a `.Timeout()` operation and it seems to perform fine, are you sure it isn't your further operation that is cutting off the end of the sequence? – Alex Feb 05 '13 at 13:20
  • 1
    After more testing it is clear that with this operator, OnComplete doesn't call until the next element in the sequence is processed, so this won't work in combination with some operators. I'll try to think of a neat solution but I am not sure there is one. – Alex Feb 05 '13 at 13:30
  • Thanks Alex. See my recent edit to the question and tell me what you think. – NoPyGod Feb 05 '13 at 13:40
  • 1
    If it works then it works :) though I still wish C# extended the `yield` keyword to work for Observables - would have made this so much easier! – Alex Feb 05 '13 at 13:49
  • 1
    This sample assumes the source is hot, i.e. it doesn't publish the source to protect against multiple side-effects from multiple subscriptions. See @Richard Szalay's answer for a safe implementation. – Lee Campbell Nov 20 '15 at 17:43
1

I think you're after TakeWhile, not TakeUntil:

var list = (new List<int>(){1,2,3,4,5,6,7,8,9,10});
var takeWhile = list
        .ToObservable()
        .Select((_, i) => Tuple.Create(i, _))
        .TakeWhile(tup => tup.Item1 < list.Count)
        .Do(_ => Console.WriteLine("Outputting {0}", _.Item2));

Ok, the thing you want doesn't exist out of the box, at least I'm not aware of something with that particular syntax. That said, you can cobble it together fairly easily (and it's not too nasty):

var fakeCmds = Enumerable
    .Range(1, 100)
    .Select(i => new SomeCommand() {CurrentIndex = i, TotalCount = 10})
    .ToObservable();

var beforeMatch = fakeCmds
    .TakeWhile(c => c.CurrentIndex != c.TotalCount);
var theMatch = fakeCmds
    .SkipWhile(c => c.CurrentIndex != c.TotalCount)
    .TakeWhile(c => c.CurrentIndex == c.TotalCount);
var upToAndIncluding = Observable.Concat(beforeMatch, theMatch);
JerKimball
  • 16,584
  • 3
  • 43
  • 55
  • Somebody already gave an answer using TakeWhile (and then deleted it). TakeWhile doesn't take the final element because it seems to be checking the condition BEFORE each element rather than AFTER. I Need the RX equivalent of a Do While loop rather than a simple While loop. – NoPyGod Feb 05 '13 at 01:51
  • Well, there is `DoWhile`...here, I'll throw an example up. – JerKimball Feb 05 '13 at 02:30
  • DoWhile doesn't take a delegate like TakeWhile. I know I can make this work using less elegant code, but the point is to do things in the tidiest way possible. – NoPyGod Feb 05 '13 at 02:33
  • actually, wait: the example I have here prints `Outputting 1...10`, so it's clearly processing the last element. What exactly are you seeing? – JerKimball Feb 05 '13 at 02:40
  • I don't like your example, it's unnecessarily complicated. I've added an answer which shows the problem. – NoPyGod Feb 05 '13 at 02:49
  • And now I've deleted that answer and instead updated the question.. what a mess. – NoPyGod Feb 05 '13 at 02:53
  • @NoPyGod Ahhh, I see now: you want *specifically* the `do { ... } while( condition );` construct – JerKimball Feb 05 '13 at 03:32
0

Combo, using a new SkipUntil and TakeUntil:

SkipUntil return source.Publish(s => s.SkipUntil(s.Where(predicate)));

TakeUntil (inclusive) return source.Publish(s => s.TakeUntil(s.SkipUntil(predicate)));

Full source: https://gist.github.com/GeorgeTsiokos/a4985b812c4048c428a981468a965a86

George Tsiokos
  • 1,890
  • 21
  • 31
-1

Perhaps the following way will be useful to someone. You must use the "Do" method and the empty "Subscribe" method.

    listOfCommands.ToObservable()
    .Do(x =>
    {
        Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
    })
    .TakeWhile(c => c.CurrentIndex != c.TotalCount)
    .Subscribe();

This way you get the result without writing your own extensions.

Ivan Zalutskii
  • 109
  • 1
  • 5