1

I'm using Hazelcast Jet to do calculations over a large number of elements (~ 480 million).

I'm starting using an IMap Sink containing the starting data. I'm filling the map using

 Collections2.permutations(initialPermutation).parallelStream().forEach(set -> {
        Permutation permutation = new Permutation(set);
        permutations.put(permutation.toString(), permutation);
    });

where Permutation is a simple data class and set is a List of Integers. When running this parallel stream I'm getting the following exception:

java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker

and I don't know how to fix it.

1 Answers1

1

Your mistake is in using the parallelStream() to perform blocking work. The Streams API is meant for computational loads, but you have a network load to push through. The implementation of Streams API has code that detects when a thread in its thread pool has entered the blocked state and replaces it with a fresh thread. However, this happens only up to a threshold, and then you get the exception.

Instead, in order to achieve the best throughput into Hazelcast, use batching: split it into chunks and then push to the map using with map.setAll(chunk).

Use the code in this answer to transform your Stream<Set<Integer>> into Stream<List<Set<Integer>>, something like this (chunkify() is defined in the linked answer):

chunkify(Collections2.permutations(initialPermutation)).stream().forEach(chunk -> {
    Map<String, String> chunkAsMap = chunk.stream().map(Permutation::new).map(Object::toString).collect(toMap(x -> x, x -> x));
    permutations.setAll(chunkAsMap);
});
Marko Topolnik
  • 195,646
  • 29
  • 319
  • 436