2

I have some imperative code which processes on 20 Threads in parallel.

IntStream.range(0, 20)
    .forEach(t -> {
        Runnable runnable = () -> {
            int records = RANGE;
            while (records > 0) {
                records = processRecords(t, Thread.currentThread().getId());
            }
        };
        new Thread(runnable, "processor-thread-".concat(String.valueOf(t)))
                .start();
    });

I tried to transform this to reactive but reactive creates only 10 threads instead of 20. What am I doing wrong? Currently imperative processing is much faster than reactive.

ExecutorService executor = Executors.newFixedThreadPool(THREADS);
IntStream.range(0, THREADS)
        .forEach(t -> Multi
                .createBy()
                .repeating()
                .uni(AtomicInteger::new, contacts -> processMessages(t))
                .until(List::isEmpty)
                .onItem()
                .disjoint()
                .runSubscriptionOn(executor)
                .subscribe()
                .with(item -> System.out.println(Thread.currentThread().getName()))
        );
Dev Bhuyan
  • 541
  • 5
  • 19

1 Answers1

0

I would not structure the program like that. I would represent each "processing" as a Uni and then create a pipeline processing them concurrently.

So, first, let's isolate the processing:

public Uni<String> process(int i ) {
   return Uni.createFrom().item(() -> processMessageBlocking(i))
            .subscribeOn(executor);
    }

Then, let's build the pipeline:

Multi.createFrom().range(0, 20)
  .onItem().transformToUni(i -> process(i)).merge(20);

transformToUni(...).merge(concurrency) is going to subscribe to concurrency (here 20) unis concurrently. Each of them will run that subscription (and the processing) in a thread from the executor.

Clement
  • 2,817
  • 1
  • 12
  • 11