I have some imperative
code which processes on 20 Threads in parallel.
IntStream.range(0, 20)
.forEach(t -> {
Runnable runnable = () -> {
int records = RANGE;
while (records > 0) {
records = processRecords(t, Thread.currentThread().getId());
}
};
new Thread(runnable, "processor-thread-".concat(String.valueOf(t)))
.start();
});
I tried to transform this to reactive
but reactive creates only 10 threads instead of 20. What am I doing wrong? Currently imperative processing is much faster than reactive.
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
IntStream.range(0, THREADS)
.forEach(t -> Multi
.createBy()
.repeating()
.uni(AtomicInteger::new, contacts -> processMessages(t))
.until(List::isEmpty)
.onItem()
.disjoint()
.runSubscriptionOn(executor)
.subscribe()
.with(item -> System.out.println(Thread.currentThread().getName()))
);