0

I am trying to understand Flux.parallel() working. My requirement is as follows: There is a list of String stringList, and let's say we are doing some task on each string in parallel and one of the string takes some time, for instance here I am using Thread.sleep, then I want to collect the string in Flux<String>, but before this I need to make sure if the execution for all the strings in parallel is finished.

In the following code, it is skipping the string Mango, why is that? And if I use blockLast() then it is not skipping but if I use blockLast(), it won't give return Flux<String>. Also, What is the difference between sequential and blockLast?

List<String> list = Arrays.asList("Mango", "Apple", "Grapes", "Java");
Flux<String> flust = Flux.fromIterable(list)
  .parallel(10)
  .runOn(Schedulers.parllel(), list.size())
  .map(string -> {
     if(string.equals("Mango")) {
        try {
          Thread.sleep(2000);
        } catch(InterruptedException e) {
          e.printStackTrace();
        }
      }
      //some task
      System.out.println(Thread.currentThread().getName() + " " + string);
      return string;
    })
   .sequential();
flust.subscribe(System.out::println);

OUTPUT :

parallel-2 Apple

parallel-4 Java

parallel-3 Grapes

Apple

Grapes

Java

lkatiforis
  • 5,703
  • 2
  • 16
  • 35
Java Programmer
  • 179
  • 1
  • 12

1 Answers1

0

As mentioned in the Reactor javadoc subscribe method:

Keep in mind that since the sequence can be asynchronous, this will immediately return control to the calling thread. This can give the impression the consumer is not invoked when executing in a main thread or a unit test for instance.

This means that the end of the main method is reached, and thus the main thread exits before the Mango element is consumed.

To solve this, you can use blockLast() instead of subscribe() and trigger the side-effect (printing) within doOnNext method:

    Flux<String> flust = Flux.fromIterable(list)
        .parallel(10)
        .runOn(Schedulers.parallel(), list.size())
        .map(string -> {
            if (string.equals("Mango")) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //some task
            System.out.println(Thread.currentThread().getName() + " " + string);
            return string;
        })
        .sequential()
        .doOnNext(System.out::println);

    flust.blockLast();

Here, you block execution on the main thread until the last element on the Flux is processed.

lkatiforis
  • 5,703
  • 2
  • 16
  • 35