3

My goal is to create a bunch of observables from a source observable, so that I can subscribe to them individually.

When I do that manually (that is, creating each subSource manually), things work as expected: the values added to the original source propagate adequately to the subSources.

But when I create them in a loop, adding them to a List<IObservable<T>>, the subscriptions to elements taken from that list don't seem to be working:

class Program
{
    static void Main(string[] args)
    {
        // using Subject for the sake of example
        var source = new Subject<int>(); 


        // manually creating each subSource
        var source0 = source.Where((t, i) => i % 3 == 0);
        var source1 = source.Where((t, i) => i % 3 == 1);
        var source2 = source.Where((t, i) => i % 3 == 2);


        // creating a List of subsources
        List<IObservable<int>> sources = new List<IObservable<int>>();

        int count = 3;

        for (int i = 0; i < count; i++)
        {
            sources.Add(source.Where((v, ix) => ix % 3 == i));
        }


        // subscribing to one subSource from each group
        source0.Subscribe(Console.WriteLine); // this works
        sources[1].Subscribe(Console.WriteLine); // this doesn't

        // feeding data
        Observable.Range(0, 20).Subscribe(source);

        Console.ReadKey();
    }
}
heltonbiker
  • 26,657
  • 28
  • 137
  • 252

2 Answers2

5

The predicate of your Where clause references the loop variable i.

However, the predicate is tested when a value is published from source - not when the loop is iterated. By the that time, i has reached it's final value 3.

To fix this, create a new variable inside the loop to store the current value of i, and reference that in your Where predicate:

for (int i = 0; i < count; i++)
{
    var j = i;
    sources.Add(source.Where((v, ix) => ix % 3 == j)); // note use of j here
}
James World
  • 29,019
  • 9
  • 86
  • 120
  • Wow. How did you know it, by debugging or by deductive reasoning? I mean, I would _never_ be able to figure this out myself, and I am quite afraid I would run into this sort of problem all the time if I end up making heavy use of Rx. Any advice in this respect? And of course, thank you very much by your thorough answer! :o) – heltonbiker Jan 15 '17 at 11:44
  • 1
    Experience helps a lot - I recognized the problem, it's actually quite a common mistake and not restricted to Rx. The main thing to take away is to release that when you are passing a function as an argument, it isn't being executed. You should get familiar with the concept of closures which are important here. Interestingly, `foreach` loops used to exhibit a similar behaviour which led to a change in C# 5 precisely because so many people were making the same mistake. See http://stackoverflow.com/questions/512166/the-foreach-identifier-and-closures for a good discussion. – James World Jan 15 '17 at 12:02
  • 1
    Closures close over variables, not the value of variables at the time they are created. So the lamdba in your `Where` is closing over `i` and not the *value* of `i`. – James World Jan 15 '17 at 12:06
1
for (int i = 0; i < count; i++)
{
    **var j = i;**
    sources.Add(source.Where((v, ix) => ix % 3 == j));
}

Closures.

Richard Anthony Hein
  • 10,550
  • 3
  • 42
  • 62