4

How to run filter, map and flatMap on Observable using multiple threads:

  def withDelay[T](delay: Duration)(t: => T) = {
    Thread.sleep(delay.toMillis)
    t
  }

  Observable
    .interval(500 millisecond)
    .filter(x => {
      withDelay(1 second) { x % 2 == 0 }
    })
    .map(x => {
      withDelay(1 second) { x * x }
    }).subscribe(println(_))

The goal is to run filtering and transformation operations concurrently using multiple threads.

Chirlo
  • 5,989
  • 1
  • 29
  • 45
Stephen L.
  • 509
  • 2
  • 14
  • Did you have a look at this: https://github.com/ReactiveX/RxJava/issues/1673 and this: https://github.com/ReactiveX/RxJavaParallel – david.mihola Nov 17 '15 at 08:11
  • @david.mihola, yes I checked both of them and was able to execute `subscribe` block in multiple threads however I could not do that for map, flatMap and filter. I assume that I might call other API or get additional data from database while filtering or transforming thus I want to make sure that this code will be executed concurrently. – Stephen L. Nov 17 '15 at 13:34

3 Answers3

0

Yo can use Async.toAsync() on each operation.

It's on the package rxjava-async

Documentation

FedericoAlvarez
  • 165
  • 1
  • 8
0

This will process each item of collection in a different thread (rxjava3).

var collect = Observable.fromIterable(Arrays.asList("A", "B", "C"))
                      .flatMap(v -> {
                         return Observable.just(v)
                                        .observeOn(Schedulers.computation())
                                        .map(v1 -> {
                                            int time = ThreadLocalRandom.current().nextInt(1000);
                                            Thread.sleep(time);
                                            return String.format("processed-%s", v1);
                                        });
                      })
                      .observeOn(Schedulers.computation())
                      .blockingStream()
                      .collect(Collectors.toList());
freedev
  • 25,946
  • 8
  • 108
  • 125
-1

You have to use observeOn operator, which it will execute all the next operators in the specific thread defined after the operator has been set

       /**
 * Once that you set in your pipeline the observerOn all the next steps of your pipeline will be executed in another thread.
 * Shall print
 * First step main
 * Second step RxNewThreadScheduler-2
 * Third step RxNewThreadScheduler-1
 */
@Test
public void testObservableObserverOn() throws InterruptedException {
    Subscription subscription = Observable.just(1)
            .doOnNext(number -> System.out.println("First step " + Thread.currentThread()
                    .getName()))
            .observeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println("Second step " + Thread.currentThread()
                    .getName()))
            .observeOn(Schedulers.newThread())
            .doOnNext(number -> System.out.println( "Third step " + Thread.currentThread()
                    .getName()))
            .subscribe();
    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

More async examples here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

paul
  • 12,873
  • 23
  • 91
  • 153