2

I want every item emitted from flatMap to run on its own thread
This is a simplified example of a real usage where each item will be a url request.
Adding subscribeOn(Schedulers.io()) on each single still run on a single thread
What's the rule here?


Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
    array[i] = i+1;
}

Observable.fromArray(array)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer i) throws Throwable {
                Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
                return Single.just(i).subscribeOn(Schedulers.io());
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Integer i) {
               // Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });

Result:

2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1
Mustafa Poya
  • 2,615
  • 5
  • 22
  • 36
BabyishTank
  • 1,329
  • 3
  • 18
  • 39
  • I don't really remember how RxJava works, but what I seem to see are: * your `flatMapSingle` always runs on the same thread, because the `Observable.fromArray` result is not told to do not so; * you seem not to need `flatMapSingle` at all; * not sure, but subscribing on `Schedulers.newThread()` or even `Schedulers.from(executor)` would do the job. – terrorrussia-keeps-killing Dec 17 '20 at 07:35
  • Does this answer your question? [Uploading multiple image to AWS server using rxJava](https://stackoverflow.com/questions/65327820/uploading-multiple-image-to-aws-server-using-rxjava) – bubbles Dec 17 '20 at 16:24

1 Answers1

4

You were on the right track except the use of just, which takes an existing object thus whatever created and computed that object happened before. In this case, it was the lambda of the flatMapSingle which is called from the same thread.

You have to make the computation itself part of the flow to be run in parallel via fromCallable for example:

Observable.fromArray(array)
.flatMapSingle(i -> {
    return Single.fromCallable(() -> {
        Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
        return i + 1000;
    })
    .subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;
akarnokd
  • 69,132
  • 14
  • 157
  • 192