1

I am trying to see if I can spawn 1 million Observables on io() and computation() Schedulers.

 public static void observableLimit() {
        sum = 0;
        long lowerBound = 0;
        long higherBound = 1000;
        Flowable.fromCallable(() -> {
            Flowable.rangeLong(lowerBound, higherBound + 1)
                    .subscribe(integer -> Observable.just(integer)
                            .subscribeOn(Schedulers.io())
                            .subscribe(j -> {
                                printNum(j);
                                sum = sum + j;
                            }));
            return true;
        }).blockingSubscribe(aBoolean -> {
            long actualSum = (higherBound * (higherBound + 1)) / 2;
            System.out.println("");
            System.out.println("SUM: " + sum);
            Assert.assertEquals(actualSum, sum);
        });
    }

For higherBound = 100 it works most of the time, for 1000 it works sometimes and fails most of the time and for 10000 it almost fails everytime, it works if I tell it to run it on newThread() and if I don't use subscribeOn() at all.

How can I fix this behaviour?

Arif Nadeem
  • 8,524
  • 7
  • 47
  • 78
  • Please have a look at another implementation to accumulate numbers. https://gist.github.com/SergejIsbrecht/c466e6811071c9ef6471c2fdd44a4619 . Please keep in mind that 1000 Threads will allocate ~1000mib of Heapspace. As a side note, your code does not compile (Java), because 'sum' must be expl. final. and can not be set from inside a closure. – Sergej Isbrecht Oct 30 '17 at 10:07
  • Your implementation works fine, yes I have a class variable named 'sum', can you please explain more about this statement "Please keep in mind that 1000 Threads will allocate ~1000mib of Heapspace"? Is this documented somewhere? – Arif Nadeem Oct 30 '17 at 10:21
  • Please have a look at the answers in this Thread: https://stackoverflow.com/questions/36898701/how-does-java-jvm-allocate-stack-for-each-thread . This might be different for Android (Dalvik-VM) – Sergej Isbrecht Oct 30 '17 at 11:09
  • I logged the thread names during execution in my code, it looks like the threads are being reused since io() uses a threadpool; for 1000 iterations the max number of threads I see are about 10. The behaviour for newThread is how you described, for 1000 iterations 1000 threads are created. – Arif Nadeem Oct 30 '17 at 11:57
  • Well actually, no parallelism is happening per-se. Range will create a value on ThreadPool.IO. This value will be pushed down to reduce. After reduce another value will be pushed sync. from range to reduce. If you would like to increment in parallel, then you would implement it differently. – Sergej Isbrecht Oct 30 '17 at 13:54

2 Answers2

1

How can I fix this behaviour?

Don't use that pattern. Why do you want to do that in the first place?

io and newThread create OS threads and are fundamentally limited by your OS' capabilities and available memory.

computation has a fixed set of threads and can handle much larger number of Flowables because they get assigned to one of the existing worker threads.

akarnokd
  • 69,132
  • 14
  • 157
  • 192
  • Hi, thanks for replying, I am evaluating this for a use-case that I have, I don't need a million observables but I am trying to find out if there's an upper limit to number of threads that can be started at once. newThread works just fine, problem is with io, the behaviour is not reliable. – Arif Nadeem Oct 30 '17 at 09:59
  • @ArifNadeem please tell us about your use-case in a new issue. Then we could evaluate if it is possible or not. – Sergej Isbrecht Oct 30 '17 at 10:02
1

The problem you're facing is not about of some limitations of Observables, but a problem with your code. You're blockingSubscribe to a Flowable that have no relation with the Flowable that span all other threads. for small values of higherBound you'll see that the code works while for large values doesn't and that because the outer Flowable may be as fast as the inner Flowable for small higherBound but collapse faster for high values of higherBound.

What I'm trying to say is that in order to see the right result you need to syncronize with the Flowable that span all the other threads instead of the outer one. I also would replace long sum by a thread-safe implementation LongAdder sum, you can achieve this using flatMap operator.

Flowable.rangeLong(lowerBound, higherBound + 1)
         .flatMap(t -> Flowable.just(t)
                 .subscribeOn(Schedulers.io())
         )
         .doOnNext(sum::add)
         .doOnComplete(() -> {
             long actualSum = (higherBound * (higherBound + 1)) / 2;
             log("SUM: " + sum.longValue() + ", ACTUAL: " + actualSum);
             log("Equals: " + (actualSum == sum.longValue()));
         })
         .blockingSubscribe();
Jans
  • 11,064
  • 3
  • 37
  • 45