0

I have a problem with my asynchronous method. It works fine, but threads count is increasing all the time.

Here is my code example:

ExecutorService executorService = Executors.newFixedThreadPool(marketplaces.size());

public void createCache() {
        List<CompletableFuture<List<OrderResponseDto>>> futures = marketplaces.stream().map(
                m -> CompletableFuture.supplyAsync(m::trades, executorService)
                        .applyToEither(timeoutAfter(TIMEOUT_SECONDS, TimeUnit.SECONDS), Function.identity())
                        .exceptionally(error -> {
                            log.warn("Trades failed {}", error);
                            return Collections.emptyList();
                        })
        ).collect(toList());

    Map<TradePlatform, List<OrderResponseDto>> collect = futures.stream()
            .map(CompletableFuture::join)
            .flatMap(List::stream)
            .collect(groupingBy(OrderResponseDto::getMarketplace));
    marketplaceCache.putAll(collect); 
}

Full project located on GitHub: https://github.com/rublin/KarboMarketplaceExplorer

Direct link to the class

Here is a test that covered this behavior.

  • The threadpool created only once (PostConstruct), but never shut down, yes. Is this the problem? – Ruslan Sheremet Sep 05 '18 at 14:11
  • yes. either you have to shutdown the threadpool so that the threads all stop running, or introduce a threadfactory that uses daemon threads. non-daemon threads will keep running. – Nathan Hughes Sep 05 '18 at 14:19
  • With daemon threads the same behaviour: `executorService = Executors.newFixedThreadPool(marketplaces.size(), r -> { Thread t = Executors.defaultThreadFactory().newThread(r); t.setDaemon(true); return t; });` – Ruslan Sheremet Sep 05 '18 at 14:24
  • There's another thread pool created in `timeoutAfter()`, but never shut down. – david a. Sep 05 '18 at 14:25
  • @NathanHughes So, basically, I have to initialize executorService every time when I run the method? And in the end, I have to shootdown it? – Ruslan Sheremet Sep 05 '18 at 14:29
  • I'd say that each time `createCache()` is called, a new threadpool with a single thread is created in `timeoutAfter()` call. Likely one for (almost) each CompletableFuture. These thread pools are never closed - so it's likely the threads of these pools stay around. – david a. Sep 05 '18 at 14:29
  • Not necessarily. You can make an executorservice and let it stay in scope for the lifetime of the application, then shut it down on exit https://stackoverflow.com/a/35996422/217324 check out @Async. – Nathan Hughes Sep 05 '18 at 14:33
  • Yes, @davida. is right. When I comment the line: .applyToEither(timeoutAfter(TIMEOUT_SECONDS, TimeUnit.SECONDS), Function.identity()) everything is fine – Ruslan Sheremet Sep 05 '18 at 14:33

1 Answers1

0

To recap the findings listed in the comments:

It's a thread leak caused by the timeoutAfter() method (see the source you linked).

This method creates a new thread pool with a single thread each time it's called, but there's nothing to close the pool and hence stop the threads.

Since timeoutAfter() is called once for (likely) every CompletableFuture created in createCache(), the overall number of threads just grow.

A fix is to reuse thread pools during the lifetime of the application and make sure they're properly shut down on exit (another consideration would be how to size the thread pools, and what type of thread pools to use, especially if the tasks submitted to them may block).

Also see Nathan Hughes comments to the question.

david a.
  • 5,283
  • 22
  • 24