3

Introduction

I'm currently developing a program in which I use Java.util.Collection.parallelStream(), and wondering if it's possible to make it more Multi-threaded.

Several small map

I was wondering if using multiple map might allow the Java.util.Collection.parallelStream() to distribute the tasks better:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(gson::toJson)
        .map(Document::parse)
        .map(InsertOneModel::new)
        .toList();

Single big map

For example a better distribution than:

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
        .map(puzzle -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle))))
        .toList();

Question

Is there one of the solutions that is more suitable for Java.util.Collection.parallelStream(), or the two have no big difference?

ThrowsError
  • 1,169
  • 1
  • 11
  • 43
  • 4
    The calls to `map()` are executed sequentially. Using several calls instead of just one won't improve performance. – Olivier Jan 29 '22 at 14:10

4 Answers4

4

I looked into the Stream source code. The result of a map operation is just fed into the next operation. So there is almost no difference between one big map() call or several small map() calls.

And for the map() operation a parallel Stream makes no difference at all. Meaning each input object will be processed until the end by the same Thread in any case.

Also note: A parallel Stream only splits up the work if the operation chain allows it and there is enough data to process. So for a small Collection or a Collection that allows no random access, a parallel Stream behaves like a sequential Stream.

FLUXparticle
  • 733
  • 6
  • 16
  • Thanks for your reply, can you add your source regarding the `Stream` code source? – ThrowsError Jan 30 '22 at 12:37
  • @JavaMan I just explored the source code in my IDE. But if you want a web source... Here it is http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/util/stream/ReferencePipeline.java – FLUXparticle Jan 31 '22 at 06:41
2

I don't think it will do any better if you chain it with multiple maps. In case your code is not very complex I would prefer to use a single big map.
To understand this we have to check the code inside the map function. link

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

As you can see a lot many things happen behind the scenes. Multiple objects are created and multiple methods are called. Hence, for each chained map function call all these are repeated.

Now coming back to ParallelStreams, they work on the concept of Parallelism .
Streams Documentation

enter image description here
A parallel stream is a stream that splits its elements into multiple chunks, processing each chunk with a different thread. Thus, you can automatically partition the workload of a given operation on all the cores of your multicore processor and keep all of them equally busy.

Parallel streams internally use the default ForkJoinPool, which by default has as many threads as you have processors, as returned by Runtime.getRuntime().availableProcessors(). But you can change the size of this pool using the system property java.util.concurrent.ForkJoinPool.common.parallelism.

ParallelStream calls spliterator() on the collection object which returns a Spliterator implementation that provides the logic of splitting a task. Every source or collection has their own spliterator implementations. Using these spliterators, parallel stream splits the task as long as possible and finally when the task becomes too small it executes it sequentially and merges partial results from all the sub tasks.

So I would prefer parallelStream when

  • I have huge amount of data to process at a time
  • I have multiple cores to process the data
  • Performance issues with the existing implementation
  • I already don't have multiple threaded process running, as it will add to the complexity.

Performance Implications

  • Overhead : Sometimes when dataset is small converting a sequential stream into a parallel one results in worse performance. The overhead of managing threads, sources and results is a more expensive operation than doing the actual work.
  • Splitting: Arrays can split cheaply and evenly, while LinkedList has none of these properties. TreeMap and HashSet split better than LinkedList but not as well as arrays.
  • Merging:The merge operation is really cheap for some operations, such as reduction and addition, but merge operations like grouping to sets or maps can be quite expensive.

Conclusion: A large amount of data and many computations done per element indicate that parallelism could be a good option.

TriS
  • 3,668
  • 3
  • 11
  • 25
2

The three steps (toJson/parse/new) have to be executed sequentially, so all you're effectively doing is comparing s.map(g.compose(f)) and s.map(f).map(g). By virtue of being a monad, Java Streams are functors, and the 2nd functor law states that, in essence, s.map(g.compose(f)) == s.map(f).map(g), meaning that the two alternative ways of expressing the computation will produce identical results. From a performance standpoint the difference between the two is likely to be minimal.

However, in general you should be careful using Collection.parallelStream. It uses the common forkJoinPool, essentially a fixed pool of threads shared across the entire JVM. The size of the pool is determined by the number of cores on the host. The problem with using the common pool is that other threads in the same process may also be using it at the same time as your code. This can lead to your code randomly and inexplicably slowing down - if another part of the code has temporarily exhausted the common thread pool, for example.

More preferable is to create your own ExecutorService by using one of the creator methods on Executors, and then submit your tasks to that.

private static final ExecutorService EX_SVC = Executors.newFixedThreadPool(16);

public static List<InsertOneModel<Document>> process(Stream<Puzzle> puzzles) throws InterruptedException {
    final Collection<Callable<InsertOneModel<Document>>> callables =
            puzzles.map(puzzle ->
                    (Callable<InsertOneModel<Document>>)
                            () -> new InsertOneModel<>(Document.parse(gson.toJson(puzzle)))
            ).collect(Collectors.toList());

    return EX_SVC.invokeAll(callables).stream()
            .map(fut -> {
                try {
                    return fut.get();
                } catch (ExecutionException|InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }).collect(Collectors.toList());
}
jon hanson
  • 8,722
  • 2
  • 37
  • 61
1

I doubt that there is much different in performance, but even if you proved it did have quicker performance I would still prefer to see and use the first style in code I had to maintain.

The first multi-map style is easier for others to understand, it is easier to maintain and easier to debug - for example adding peek stages for any stage of the processing chain.

List<InsertOneModel<Document>> bulkWrites = puzzles.parallelStream()
    .map(gson::toJson)
    // easy to make changes for debug, moving peek up/down
    // .peek(System.out::println)
    .map(Document::parse)
    // easy to filter:
    // .filter(this::somecondition)
    .map(InsertOneModel::new)
    .toList();

If your requirements change - such as needing to filter the output, or capture the intermediate data by splitting to 2 collections, the first approach beats second every time.

DuncG
  • 12,137
  • 2
  • 21
  • 33