5

Let's assume I have such code:

static void Main(string[] args)
    {
        var scheduler = NewThreadScheduler.Default;
        var enumerable = Enumerable.Range(0, 100);

        enumerable
            .ToObservable(scheduler)
            .SubscribeOn(scheduler)
            .Subscribe(item =>
            {
                Console.WriteLine("Consuming {0} on Thread: {1}", item, Thread.CurrentThread.ManagedThreadId);

                // simulate long running operation
                Thread.Sleep(1000);
            });

        Console.ReadKey();
    }

As you I convert IEnumerable to IObservable. Then I want to consume each item on new thread so I used SubsribeOn(scheduler). Unfortunately each iteration works on same thread so one iteration blocks next.

Result is:

Consuming 0 on Thread: 4
Consuming 1 on Thread: 4
Consuming 2 on Thread: 4
Consuming 3 on Thread: 4
Consuming 4 on Thread: 4
....

Is is possible to force such behavior?

Lukas
  • 621
  • 2
  • 12
  • 29
  • Shouldn't it be `ObserveOn` instead of `SubscribeOn`? – Dirk Sep 05 '17 at 12:48
  • I've tried with ObserveOn, SubscribeOn and both at the same time with no success. – Lukas Sep 05 '17 at 12:49
  • I don't want to produce on new thread but what I want is to CONSUME each item on new thread. – Lukas Sep 05 '17 at 12:57
  • Have a look at the accepted answer of the dupe I marked. Especially the last sentence of it. – Fildor Sep 05 '17 at 13:11
  • Do also keep in mind that starting threads is quite resource intensive. You really don't want to do it very often, so sharing an existing thread that is idle is far better than starting up a new one. – Enigmativity Sep 06 '17 at 07:12
  • 1
    Well said @Enigmativity. The fact that you don't often see code to manipulate the minimum size of the thread pool that I had to add to make the point in the contrived example below is not because it's a well kept secret - it's because it is rarely a good idea. Once you've hit the number of logical threads equal to the number of cores on your box on a compute bound problem, adding further threads is extremely likely to decrease performance.Using a thread pool and judicious code to await IO bound operations to minimize threads often leads to good performance characteristics. – James World Sep 06 '17 at 07:25

1 Answers1

5

The behaviour you are seeing is completely by design.

Fundamental to Rx is it's grammar which declares that a stream is defined as a sequence of zero or more OnNext calls followed by an optional OnError or OnCompleted.

In particular, Rx grammar dictates that each of these messages is delivered sequentially for a given subscriber.

So what you are seeing is the correct behaviour - no concurrent execution of OnNext handlers. Given this deliberate constraint, creating a new thread for each OnNext would be quite wasteful.

Under the covers, if you trace the code through far enough, you'll see that the NewThreadScheduler utilizes an EventLoopScheduler specifically to re-use the thread for each subscriber. The moniker NewThreadScheduler really speaks to the fact the each subscriber gets a new thread, not each event.

To see this, modify your code so that we have two subscribers running at different speeds. You'll see each gets its own thread and proceeds at it's own pace and the faster is unimpeded by the slower:

var scheduler = NewThreadScheduler.Default;
var enumerable = Enumerable.Range(0, 100);

var xs = enumerable
    .ToObservable(scheduler)
    .SubscribeOn(scheduler);

xs.Subscribe(item =>
{
    Console.WriteLine("Slow consuming {0} on Thread: {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simulate slower long running operation
    Thread.Sleep(1000);
});

xs.Subscribe(item =>
{
    Console.WriteLine("Fast consuming {0} on Thread: {1}",
        item, Thread.CurrentThread.ManagedThreadId);

    // simulate faster long running operation
    Thread.Sleep(500);
});

Console.ReadKey();

You may find a read through of the Rx Design Guidelines is quite helpful.

The desire to allow for concurrent processing of events in a subscriber suggests a queue with multiple consumers may be what you are after - and for that you could look outside of Rx, for example a BCL ConcurrentQueue<T>. It is also possible to project messages into asynchronous calls and gather results on completion without violating Rx grammar constraints.

e.g. Here's some similar code that randomly processes each number in the stream for a different length of time. You can see the results come in out of order, and unimpeded by each other. It's not awesome code, but it makes the point. It could be genuinely useful if the async work was something IO bound. Also note the use of Observable.Range which avoids the use of the Enumerable.Range().ToObservable() combo. Tested on .NET Core 2.0:

var random = new Random();

// stop the threadpool from throttling us as it grows
ThreadPool.SetMinThreads(100, 1);

Observable.Range(0, 100)
.SelectMany(x => Observable.Start(() =>
{
    Console.WriteLine($"Started {x}");
    Thread.Sleep(random.Next(1, 10) * 1000);
    return x;
}))
.Subscribe(item =>
{
    Console.WriteLine($"{item}, {Thread.CurrentThread.ManagedThreadId}");
});

Console.ReadKey();
James World
  • 29,019
  • 9
  • 86
  • 120