0

I have a lot of network call which emits different types e.g. string, int and so on.

I'm trying to make them parallel.

In the official rxjava doc we can read:

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

This example is easy, because we have all types as Int. But how to do it if we have different types e.g. string, boolean, Int ?

  • every call from this 5 is independent
  • group of this 5 calls will be in one method, and this method will be invoked by other random method.
  • we can assume, the result of 5 calls, will be a type of first call -> string
discCard
  • 421
  • 1
  • 5
  • 16
  • I think the question lack of some info : which event trigger the mutliple call ? What is intended as the result of the multiple call ? I suppose it is to merge them in a single object ? Flowable is not the only solution to trigger parralelism in Rx. – colin aygalinc Dec 28 '20 at 14:13
  • I added a lot additional informations – discCard Dec 28 '20 at 14:18

1 Answers1

1

Combining Observables

Operators that work with multiple source Observables to create a single Observable

  • And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries

  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable

  • Merge — combine multiple Observables into one by merging their emissions

  • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable

  • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables

  • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

I think in your case you can use zip or combineLatest

read this document http://reactivex.io/documentation/operators.html#combining

and I think you also need to know

Brief introduction of RxJava schedulers.

  • Schedulers.io() – This is used to perform non-CPU-intensive operations like making network calls, reading disc/files, database operations, etc., This maintains a pool of threads.

  • Schedulers.newThread() – Using this, a new thread will be created each time a task is scheduled. It’s usually suggested not to use scheduler unless there is a very long-running operation. The threads created via newThread() won’t be reused.

  • Schedulers.computation() – This schedular can be used to perform CPU-intensive operations like processing huge data, bitmap processing etc., The number of threads created using this scheduler completely depends on number CPU cores available.

  • Schedulers.single() – This scheduler will execute all the tasks in sequential order they are added. This can be used when there is a necessity of sequential execution is required.

  • Schedulers.immediate() – This scheduler executes the task immediately in a synchronous way by blocking the main thread.

  • Schedulers.trampoline() – It executes the tasks in First In – First Out manner. All the scheduled tasks will be executed one by one by limiting the number of background threads to one.

  • Schedulers.from() – This allows us to create a scheduler from an executor by limiting the number of threads to be created. When the thread pool is occupied, tasks will be queued.

saeedata
  • 901
  • 6
  • 14
  • I was trying to do it with zip or combineLatest and this looks fine but the time of work was exactly the same or more like with async style, so this parallel didn't work. I have found an answer on stack with information, we cannot use zip or combinelatest: https://stackoverflow.com/questions/44643106/how-to-combine-multiple-rxjava-chains-non-blocking-in-error-case – discCard Dec 28 '20 at 15:23
  • I use it a lot and I think you are using it wrongly, you should read the comments or another answer in your link, – saeedata Dec 28 '20 at 15:28
  • flatMap just concat observebles mean new Observable wait until the previous emit value and not calling them at the same time. if it doesn't work for you I think it's because of the wrong Schaduler that you are using – saeedata Dec 28 '20 at 15:32
  • read this link https://stackoverflow.com/questions/33370339/what-is-the-difference-between-schedulers-io-and-schedulers-computation – saeedata Dec 28 '20 at 15:36