18

I have a sleep method for simulating a long running process.

private void sleep() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Then I have a method returns an Observable containing a list of 2 strings that is given in the parameters. It calls the sleep before return the strings back.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            sleep();
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}

Then I am calling the getStrings three times in Observalb.zip, I expect those three calls to run in parallel, so the total time of execution should be within 2 seconds or maybe 3 seconds the most because the sleep was only 2 seconds. However, it's taking a total of six seconds. How can I make this to run in parallel so it will finish within 2 seconds?

Observable
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(List<String> strings) {
        //Display the strings
    }
});

The mergeStringLists method

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Func3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d(TAG, "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}
s-hunter
  • 24,172
  • 16
  • 88
  • 130

3 Answers3

21

That's happening because subscribing to your zipped observable happens in the the same, io thread.

Why don't you try this instead:

Observable
    .zip(
        getStrings("One", "Two")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Three", "Four")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Five", "Six")
            .subscribeOn(Schedulers.newThread()),
        mergeStringLists())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<String>>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(List<String> strings) {
            //Display the strings
        }
    });

Let me know if that helped

Bartek Lipinski
  • 30,698
  • 10
  • 94
  • 132
  • 5
    `Schedulers.io()` is by default a thread pool that grows as needed. – Tassos Bassoukos Jul 06 '16 at 22:16
  • @TassosBassoukos do you mean Schedulers.io() will create new threads automatically as needed? – s-hunter Jul 06 '16 at 22:37
  • @Bartek your solution worked, do you know if there is any other solutions than yours for making the zip to run parallel? – s-hunter Jul 06 '16 at 22:40
  • @s-hunter Indeed, [per the docs](http://reactivex.io/RxJava/javadoc/rx/schedulers/Schedulers.html#io()) – Tassos Bassoukos Jul 07 '16 at 00:55
  • @TassosBassoukos, in that case, why is my code running sequentially instead of parallel? – s-hunter Jul 07 '16 at 01:18
  • @s-hunter you're applying `.subscribeOn` to the **zipped** observable. I'm applying `subscribeOn` to the components of the **zipped** observable. In other words: you're creating one new thread for the observable that represents a combination of three component observables, I create one new thread for every single one of the component observables. – Bartek Lipinski Jul 07 '16 at 07:05
  • Yes, I got that, I was wondering if there are other alternatives to your solution – s-hunter Jul 07 '16 at 14:33
  • I would use `subscribeOn(Schedulers.io())` instead of `subscribeOn(Schedulers.newThread())` is much more expensive. Brief explanation. `Schedulers.io()` – This is used to perform non-CPU-intensive operations like making network calls, reading disc/files, database operations, etc., This maintains a pool of threads. `Schedulers.newThread()` – Using this, a new thread will be created each time a task is scheduled. It’s usually suggested not to use scheduler unless there is a very long-running operation. The threads created via newThread() won’t be reused. – Flávio Henrique Aug 17 '22 at 15:36
2

Here I have an example that I did using Zip in asynchronous way, just in case you´re curious

/**
 * Since every observable into the zip is created to                 subscribeOn a diferent thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
 */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2).concat(s3))
            .subscribe(result -> showResult("Async in:", start, result));
}

public Observable<String> obAsyncString() {
    return Observable.just("")
            .observeOn(scheduler)
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
            .observeOn(scheduler1)
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
            .observeOn(scheduler2)
            .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName()))
            .map(val -> "!");
}

You can see more examples here https://github.com/politrons/reactive

Marcel Bro
  • 4,907
  • 4
  • 43
  • 70
paul
  • 12,873
  • 23
  • 91
  • 153
0

You don't necessarily need to use Schedulers.newThread() since is much more expensive and not recommended (Most of the cases), follow an example bellow with Schedulers.io().

@Test
public void testWithZipInParallelSuccess() {

    // Given
    Single<Integer> source1 = Single.fromCallable(() -> {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("Source 1 emitted : " + i + " " +    Thread.currentThread());
        }
        return 1;
    }).subscribeOn(Schedulers.io());

    Single<Integer> source2 = Single.fromCallable(() -> {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("Source 2 emitted : " + i + " " +    Thread.currentThread());
        }
        return 2;
    }).subscribeOn(Schedulers.io());
    long start = System.currentTimeMillis();

    // When
    Single.zip(source1, source2, (a, b) -> {
        return a;
    }).blockingGet();
    long end = System.currentTimeMillis();

    // Then
    long totalExecutionTime = end - start;
    System.out.println("Total execution time: " + totalExecutionTime);
    Assertions.assertTrue(totalExecutionTime < 6000);

    /*
    Output:
    Source 1 emitted : 0 Thread[RxCachedThreadScheduler-1,5,main]
    Source 2 emitted : 0 Thread[RxCachedThreadScheduler-2,5,main]
    Source 1 emitted : 1 Thread[RxCachedThreadScheduler-1,5,main]
    Source 2 emitted : 1 Thread[RxCachedThreadScheduler-2,5,main]
    Source 2 emitted : 2 Thread[RxCachedThreadScheduler-2,5,main]
    Source 1 emitted : 2 Thread[RxCachedThreadScheduler-1,5,main]
    Total execution time: 3026
    */

}

Follow more examples: https://gist.github.com/flavio-henrique/4824feedc6da71b397b38dc86bb23618

Some reference: What is the difference between Schedulers.io() and Schedulers.computation()