12

I am trying to enumerate a large IEnumerable once, and observe the enumeration with various operators attached (Count, Sum, Average etc). The obvious way is to transform it to an IObservable with the method ToObservable, and then subscribe an observer to it. I noticed that this is much slower than other methods, like doing a simple loop and notifying the observer on each iteration, or using the Observable.Create method instead of ToObservable. The difference is substantial: it's 20-30 times slower. It is what it is, or am I doing something wrong?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Output:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, Console App, Release built


Update: Here is an example of the actual functionality I want to achieve:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

Output:

Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5

The important difference of this approach compared to using standard LINQ operators, is that the source enumerable is enumerated only once.


One more observation: using ToObservable(Scheduler.Immediate) is slightly faster (about 20%) than ToObservable().

Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • 2
    A 1-time measurement isn't all too reliable. Consider setting up a benchmark with [BenchmarkDotNet](https://benchmarkdotnet.org/) for example. (Not affiliated) – Fildor Apr 02 '20 at 08:45
  • @Fildor the BenchmarkDotNet is more useful for microbenchmarks, or for measuring small differences that occur with high variability. For this case a `Stopwatch` is probably enough. – Theodor Zoulias Apr 02 '20 at 08:55
  • 1
    @TheodorZoulias There's more to it than that, for example, I would question your benchmark as it currently stands as the order of execution within that single run could be causing large differences. – Oliver Apr 02 '20 at 08:58
  • 1
    Stopwatch _may_ be enough, if you gathered statistics. Not just a single sample. – Fildor Apr 02 '20 at 09:00
  • 1
    @Fildor I just ran the test again with reverse order: `Method3(COUNT); Method2(COUNT); Method1(COUNT);`. I got similar results. – Theodor Zoulias Apr 02 '20 at 09:01
  • I think initializtion of enumerable should be moved outside of the functions for the sake of accuracy. – Eldar Apr 02 '20 at 09:02
  • 1
    I am not saying you _will_ see a completely different result. It just will be more reliable. – Fildor Apr 02 '20 at 09:02
  • @Eldar Init is outside measurement. (If you are talking about the line `var source = Enumerable.Range(0, count);`) – Fildor Apr 02 '20 at 09:04
  • 1
    @Eldar I just tested you suggestion. I passed the same enumerable to all three methods. The results are similar. – Theodor Zoulias Apr 02 '20 at 09:05
  • 1
    @Fildor - The results are correct and they are expected. – Enigmativity Apr 02 '20 at 09:06
  • 1
    @Enigmativity I am not saying they are INcorrect. _I_'d just roll a decent benchmark to be _sure_. – Fildor Apr 02 '20 at 09:07
  • 1
    @Fildor I am not interested if the `ToObservable` is exactly 24.8 or 25.2 times slower. It doesn't make any difference for my use case. In both cases I am inclined not to use it, and use one of the other methods instead. – Theodor Zoulias Apr 02 '20 at 09:09
  • 2
    @Fildor - Fair enough. I mean that the figures are representative of what one should expect. – Enigmativity Apr 02 '20 at 09:09
  • 1
    @TheodorZoulias - It would be a mistake not to use `.ToObservable()` for the reasons I outlined in my answer. Speed is not the goal here. – Enigmativity Apr 02 '20 at 09:10
  • @Enigmativity Agreed. – Fildor Apr 02 '20 at 09:10
  • 2
    @TheodorZoulias - Nice question, btw. – Enigmativity Apr 02 '20 at 09:13
  • @TheodorZoulias - This is also one of the reasons why I say avoid `Observable.Create` and especially so if you end up doing a `return Disposable.Empty;`. – Enigmativity Apr 02 '20 at 09:14

2 Answers2

9

This is the difference between a well behaved observable and a "roll-your-own-because-you-think-faster-is-better-but-it-is-not" observable.

When you dive down far enough in the source you discover this lovely little line:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

The is effectively calling hasNext = enumerator.MoveNext(); once per scheduled recursive iteration.

This allows you to choose the scheduler for your .ToObservable(schedulerOfYourChoice) call.

With the other options you've chosen you've created a bare-to-the-bone series of calls to .OnNext that virtually do nothing. Method2 doesn't even has a .Subscribe call.

Both of Method2 and Method1 run using the current thread and both run to completion before the subscription is finished. They are blocking calls. They can cause race conditions.

Method1 is the only one that behaves nicely as an observable. It is asynchronous and it can run independently of the subscriber.

Do keep in mind that observables are collections that run over time. They typically have an async source or a timer or the respond to external stimulus. They don't often run off of a plain enumerable. If you're working with an enumerable then working synchronously should be expected to run faster.

Speed is not the goal of Rx. Performing complex queries on time-based, pushed values is the goal.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • 2
    "roll-your-own-because-you-think-faster-is-better-but-it-is-not" - excellent!! – Fildor Apr 02 '20 at 09:06
  • Thanks Enigmativity for the detailed answer! I updated my question with an example of what I actually want to achieve, which is a calculation synchronous in nature. Do you think that instead of Reactive extensions I should search for another tool, given that performance is critical in my case? – Theodor Zoulias Apr 02 '20 at 09:38
  • @TheodorZoulias - Here's the enumerable way to do your example in your question: `source.Aggregate(new { count = 0, sum = 0L }, (a, x) => new { count = a.count + 1, sum = a.sum + x }, a => new { a.count, a.sum, average = (double)a.sum / a.count })`. One iteration only and over 10x faster than Rx. – Enigmativity Apr 02 '20 at 09:57
  • I just tested it, and it's faster indeed, but only about x2 faster (compared to RX without `ToObservable`). This is the other extreme, where I have the best performance but I am forced to re-implement every LINQ operator inside a complex lambda expression. It is error prone and less maintainable, considering that my actual calculations involve even more operators, and combinations of them. I think that it's quite tempting to pay a x2 performance price for having a clear and readable solution. On the other hand paying x10 or x20, not so much! – Theodor Zoulias Apr 02 '20 at 10:18
  • Perhaps if you posted exactly what you're trying to do I could suggest an alternative? – Enigmativity Apr 02 '20 at 10:29
  • TBH my actual case is in the realm of imagination for the time being. I am trying to understand which tools are more suitable for different scenarios, and the requirement for single enumeration seems to come up quite frequently. – Theodor Zoulias Apr 02 '20 at 10:41
  • @TheodorZoulias - Keep in mind that the Rx way of having single iteration is causing multiple interations of the subjects. You haven't removed any iterations in practice at all. – Enigmativity Apr 02 '20 at 11:04
  • Could you elaborate on that? My understanding is that a `Subject` is just a list of subscribers (observers), and everytime it gets a notification it propagates it to its subscribers. Is it more than this? – Theodor Zoulias Apr 02 '20 at 11:12
  • @TheodorZoulias - No, that's exactly it. Say you're doing 4 calculations so you could iterate your enumerable 4 times to do it. Or you could attach 4 observers to a subject and iterate your enumerable once. Either way it is `n * 4` values that you're running through - `n` times `4`observers or `n` times `4` iterations. – Enigmativity Apr 02 '20 at 11:24
  • I want to avoid enumerating the source more than once because the data may not be stored in memory. Instead could be fetched one at a time from the filesystem or a database. In this case each enumeration could even yield different results, making my computed values inconsistent. – Theodor Zoulias Apr 02 '20 at 11:32
  • @TheodorZoulias - Then the observable approach is good. It is an ephemeral way to iterate through a list of items. You could easily create an observable directly from `File.ReadLines` or a DB query and never actually have the values in an enumerable in the first place. – Enigmativity Apr 02 '20 at 11:49
  • Yeap, the [`File.ReadLines`](https://learn.microsoft.com/en-us/dotnet/api/system.io.file.readlines) is exactly the method I had in mind. It returns an `IEnumerable` though. So we are back to the question of how to make an `IObservable` out of an `IEnumerable` efficiently. :-) – Theodor Zoulias Apr 02 '20 at 12:06
  • @TheodorZoulias - Try this way of reading a file with pure observables then: https://dotnetfiddle.net/PB9w9W. – Enigmativity Apr 02 '20 at 22:43
  • I tried to test your Using-Defer-Repeat-TakeWhile solution, and ran into a [strange behavior](https://stackoverflow.com/questions/61012408/the-observable-repeat-is-unstoppable-is-it-a-bug-or-a-feature). – Theodor Zoulias Apr 03 '20 at 13:00
  • I just [tested it](https://dotnetfiddle.net/9BnyYw), after adding `ObserveOn(Scheduler.CurrentThread)` as a workaround for the aforementioned strange behavior, and the results aren't promising. It has five times more overhead than `ToObservable`, which is already slow. – Theodor Zoulias Apr 03 '20 at 15:40
-1

Because the Subject do nothing .

It look like the peformance of the loop statement are different for 2 cases :

for(int i=0;i<1000000;i++)
    total++;

or

for(int i=0;i<1000000;i++)
    DoHeavyJob();

If use another Subject , with a slow OnNext implementation , the result will be more acceptable

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 100;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    class My_Slow_Subject : SubjectBase<int>
    {

        public override void OnNext(int value)
        {
            //do a job which spend 3ms
            System.Threading.Thread.Sleep(3);
        }


        bool _disposed;
        public override bool IsDisposed => _disposed;
        public override void Dispose() => _disposed = true;
        public override void OnCompleted() { }
        public override void OnError(Exception error) { }
        public override bool HasObservers => false;
        public override IDisposable Subscribe(IObserver<int> observer) 
                => throw new NotImplementedException();
    }

    static SubjectBase<int> CreateSubject()
    {
        return new My_Slow_Subject();
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = CreateSubject();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

Output

ToObservable: 434 msec
Loop & Notify: 398 msec
Observable.Create: 394 msec

The ToObservable support System.Reactive.Concurrency.IScheduler

That means you can implement your own IScheduler and decide when to run each tasks

Hope this helps

Regards

BlazorPlus
  • 183
  • 1
  • 4
  • You do realize OP is talking explicitly about COUNT values 100,000x higher magnitude? – Fildor Apr 02 '20 at 09:32
  • Thanks BlazorPlus for the answer. I have updated my question adding a more realistic example of my use case. The `subject` is observed by other operators that perform calculations, so it's not doing nothing. The performance penalty of using `ToObservable` is still substantial, because the calculations are very light. – Theodor Zoulias Apr 02 '20 at 09:44