I have a problem with my asynchronous method. It works fine, but threads count is increasing all the time.
Here is my code example:
ExecutorService executorService = Executors.newFixedThreadPool(marketplaces.size());
public void createCache() {
List<CompletableFuture<List<OrderResponseDto>>> futures = marketplaces.stream().map(
m -> CompletableFuture.supplyAsync(m::trades, executorService)
.applyToEither(timeoutAfter(TIMEOUT_SECONDS, TimeUnit.SECONDS), Function.identity())
.exceptionally(error -> {
log.warn("Trades failed {}", error);
return Collections.emptyList();
})
).collect(toList());
Map<TradePlatform, List<OrderResponseDto>> collect = futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(groupingBy(OrderResponseDto::getMarketplace));
marketplaceCache.putAll(collect);
}
Full project located on GitHub: https://github.com/rublin/KarboMarketplaceExplorer
Direct link to the class
Here is a test that covered this behavior.