9

Seems like this question should already have an answer but I could not find a duplicate.

Anyways I am wondering what community thinks about Stream.map use case like this?

Wrapper wrapper = new Wrapper();
list.stream()
    .map( s -> {
        wrapper.setSource(s);
        return wrapper;
    } )
    .forEach( w -> processWrapper(w) );
    
public static class Source {
    private final String name;
        
    public Source(String name) {
        this.name = name;
    }
        
    public String getName() {
        return name;
    }
}
    
public static class Wrapper {
    private Source source = null;
        
    public void setSource(Source source) {
        this.source = source;
    }
        
    public String getName() {
        return source.getName();
    }
}

public void processWrapper(Wrapper wrapper) {
}

I am not a big fan of this usage of map but it potentially can help with performance when dealing with large streams and avoid creating unnecessary Wrapper for every Source.

This definitely has its limitation like being almost useless with parallel streams and terminal operation like collect.

Update - The question is not about "how to do it" but "can I do it this way". For example, I can have a code that only works with Wrapper and I want to invoke it in forEach but want to avoid creating a new instance of it for each Source element.

Benchmark Results

Shows about 8 fold improvement with reusable wrapper-

Benchmark (N) Mode Cnt Score Error Units

BenchmarkTest.noReuse 10000000 avgt 5 870.253 ± 122.495 ms/op

BenchmarkTest.withReuse 10000000 avgt 5 113.694 ± 2.528 ms/op

Benchmark code -

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class BenchmarkTest {

    @Param({"10000000"})
    private int N;

    private List<Source> data;

    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
            .include(BenchmarkTest.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(opt).run();
    }

    @Setup
    public void setup() {
        data = createData();
    }

    @Benchmark
    public void noReuse(Blackhole bh) {
        data.stream()
            .map( s -> new Wrapper1( s.getName() ) )
            .forEach( t -> processTarget(bh, t) );
    }

    @Benchmark
    public void withReuse(Blackhole bh) {
        Wrapper2 wrapper = new Wrapper2();
        data.stream()
            .map( s -> { wrapper.setSource(s); return wrapper; } )
            .forEach( w -> processTarget(bh, w) );
    }
    
    public void processTarget(Blackhole bh, Wrapper t) {
        bh.consume(t);
    }
    
    private List<Source> createData() {
        List<Source> data = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            data.add( new Source("Number : " + i) );
        }
        return data;
    }
    
    public static class Source {
        private final String name;

        public Source(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public interface Wrapper {
        public String getName();
    }
    
    public static class Wrapper1 implements Wrapper {
        private final String name;

        public Wrapper1(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }
    
    public static class Wrapper2 implements Wrapper {
        private Source source = null;

        public void setSource(Source source) {
            this.source = source;
        }

        public String getName() {
            return source.getName();
        }
    }
}

Full benchmark report -

# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.noReuse
# Parameters: (N = 10000000)

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 1
# Warmup Iteration   1: 1083.656 ms/op
# Warmup Iteration   2: 846.485 ms/op
# Warmup Iteration   3: 901.164 ms/op
# Warmup Iteration   4: 849.659 ms/op
# Warmup Iteration   5: 903.805 ms/op
Iteration   1: 847.008 ms/op
Iteration   2: 895.800 ms/op
Iteration   3: 892.642 ms/op
Iteration   4: 825.901 ms/op
Iteration   5: 889.914 ms/op


Result "BenchmartTest.noReuse":
  870.253 ±(99.9%) 122.495 ms/op [Average]
  (min, avg, max) = (825.901, 870.253, 895.800), stdev = 31.812
  CI (99.9%): [747.758, 992.748] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.withReuse
# Parameters: (N = 10000000)

# Run progress: 50.00% complete, ETA 00:01:58
# Fork: 1 of 1
# Warmup Iteration   1: 113.780 ms/op
# Warmup Iteration   2: 113.643 ms/op
# Warmup Iteration   3: 114.323 ms/op
# Warmup Iteration   4: 114.258 ms/op
# Warmup Iteration   5: 117.351 ms/op
Iteration   1: 114.526 ms/op
Iteration   2: 113.944 ms/op
Iteration   3: 113.943 ms/op
Iteration   4: 112.930 ms/op
Iteration   5: 113.124 ms/op


Result "BenchmarkTest.withReuse":
  113.694 ±(99.9%) 2.528 ms/op [Average]
  (min, avg, max) = (112.930, 113.694, 114.526), stdev = 0.657
  CI (99.9%): [111.165, 116.222] (assumes normal distribution)


# Run complete. Total time: 00:03:40

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                     (N)  Mode  Cnt    Score     Error  Units
BenchmarkTest.noReuse    10000000  avgt    5  870.253 ± 122.495  ms/op
BenchmarkTest.withReuse  10000000  avgt    5  113.694 ±   2.528  ms/op
Community
  • 1
  • 1
tsolakp
  • 5,858
  • 1
  • 22
  • 28
  • 3
    It's very unclear what you're trying to do here. In particular, you are aware that all you're doing is selecting the last element in the list a whole bunch of times? – chrylis -cautiouslyoptimistic- Jun 05 '19 at 14:31
  • What is the point of the wrapper...It does nothing but add unnecessary complexity – RobOhRob Jun 05 '19 at 14:36
  • @chrylis Not sure what you mean by last element? For example if you have code that only works with `Wrapper` type that will be invoked in `forEach` but you want to avoid creating a new Wrapper for every `Source` element in the stream. – tsolakp Jun 05 '19 at 14:36
  • There is only one wrapper object.... you change its `source` n times, and in the end it's source is the final element in the stream – RobOhRob Jun 05 '19 at 14:37
  • 2
    The Stream.map you use to convert one type into another. E.g. You want only the IDs from a entity Class .. so you do .map(Class::getId) ... In your example, you don't need the wrapper and only list.forEach( s-> System.out.println(s.getName()); – Brother Jun 05 '19 at 14:38
  • 4
    There are basically two possible scenarios, those where a purely local temporary wrapper instance could get optimized away by the JVM after applying Escape Analysis and those, where this reusable wrapper would break the operation. – Holger Jun 05 '19 at 15:00
  • 3
    @Holger By the first, did you mean something like `Stream.of(new Source("s1"), new Source("s2")) .map(Wrapper::new) // <== this? .forEach(w -> processWrapper(w));` – Naman Jun 08 '19 at 03:59
  • 4
    @Naman exactly. A temporary, preferably immutable wrapper. The irony is that the mutable, reused wrapper only works in scenarios where the temporary wrapper would work and be potentially more efficient, at least with a high probability. Such optimizations will fail when the temporary nature can't be proven, hence, where the risk of reusing not working due to unintended longer lifetime is high. E.g. even an operation like `max`, which returns a single result, needs to hold two elements at a time internally. There are not many scenarios where reusing will work. – Holger Jun 08 '19 at 08:39
  • 2
    This question smells of "premature optimization". – Stephen C Jun 09 '19 at 04:30
  • @StephenC I was going to use the same term. – Miss Chanandler Bong Jun 14 '19 at 16:04

2 Answers2

11

Your approach happens to work because the stream pipeline only consists of stateless operation. In such constellations, the sequential stream evaluation may process one element at a time, so accesses to wrapper instances do not overlap, like illustrated here. But note that this is not a guaranteed behavior.

It definitely doesn’t work with stateful operations like sorted and distinct. It also can’t work with reduction operations, as they always have to hold at least two elements for processing, which includes reduce, min, and max. In the case of collect, it depends on the particular Collector. forEachOrdered wouldn’t work with parallel streams, due to the required buffering.

Note that parallel processing would be problematic even when you use TheadLocal to create thread confined wrappers, as there is no guaranty that objects created in one worker thread stay local to that thread. A worker thread may hand over a partial result to another thread before picking up another, unrelated workload.

So this shared mutable wrapper works with a particular set of stateless operations, like map, filter, forEach, findFirst/Any, all/any/noneMatch, in a sequential execution of a particular implementation. You don’t get the flexibility of the API, as you have to limit yourself, can’t pass the stream to arbitrary code expecting a Stream nor use arbitrary Collector implementations. You also don’t have the encapsulation of the interface, as you are assuming particular implementation behavior.

In other words, if you want to use such a mutable wrapper, you are better off with a loop implementing the particular operation. You do already have the disadvantages of such a manual implementation, so why not implementing it to have the advantages.


The other aspect to consider is, what you gain from reusing such a mutable wrapper. It only works in loop-like usages where a temporary object might get optimized away after applying Escape Analysis anyway. In such scenarios, reusing objects, extending their lifetime, may actually degrade performance.

Of course, Object Scalarization is not a guaranteed behavior. There might be scenarios, like a long stream pipeline exceeding the JVM’s inlining limit, where the objects don’t get elided. But still, temporary objects are not necessarily expensive.

This has been explained in this answer. Temporary objects are cheaply allocated. The main costs of a garbage collection are caused by objects which are still alive. These need to be traversed and these need to be moved when making room for new allocations. The negative impact of temporary objects is that they may shorten the time between garbage collection rounds. But this is a function of allocation rate and available allocation space, so this is truly a problem that can be solved by throwing more RAM at it. More RAM means more time between GC cycles and more dead objects when GC happens, which makes the net costs of the GC smaller.

Still, avoiding excessive allocations of temporary objects is a valid concern. The existence of IntStream, LongStream, and DoubleStream shows that. But these are special, as using primitive types is a viable alternative to using the wrapper objects without the disadvantages of reusing a mutable wrapper. It’s also different because it’s applied to problems where the primitive type and the wrapper type are semantically equivalent. In contrast, you want to solve a problem where the operation requires the wrapper type. For the primitive stream also applies, when you need the objects for your problem, there is no way around boxing, which will create distinct objects for distinct values, not sharing a mutable object.

So if you similarly have a problem where a semantically equivalent wrapper-object-avoiding alternative without substantial problems exists, like just using Comparator.comparingInt instead of Comparator.comparing where feasible, you may still prefer it. But only then.


In short, most of the time, the savings of such object reuse, if any, will not justify the disadvantages. In special cases, where it’s beneficial and matters, you may be better off with a loop or any other construct under your full control, instead of using a Stream.

Naman
  • 27,789
  • 26
  • 218
  • 353
Holger
  • 285,553
  • 42
  • 434
  • 765
  • 1
    Very good writeup, particularly on GC handling and Stream operation lists that can produce unexpected results with single wrapper. The reason I was thinking of using Stream is to pass data in between abstraction layer. For example the source object can be jdbc ResultSet where DAO would wrap it with DTO and pass to REST controller layer to have it serialize directly onto http servlet response as JSON. – tsolakp Jun 08 '19 at 14:51
  • Last sentence deserves emphasis. If performance is so important that creation of small wrapper objects is significant, don't use streams. (At least not in Java 11/12.) – Stephen C Jun 09 '19 at 04:30
  • @tsolakp that sounds like a typical loop-like operation where a temporary object could get elided (or implementing a real loop be feasible). On the other hand, it sounds like an operation that is entirely dominated by the I/O performance, so temporary objects, even if not eliminated, do not matter at all. – Holger Jun 10 '19 at 13:42
  • @Holger. Agree, but I wanted to move the `loop` logic into another abstraction layer. Maybe `Iterable` would have been much better option than `Stream`? – tsolakp Jun 10 '19 at 17:39
  • 1
    @StephenC any reasons specific to the addition of *At least not in Java 11/12.* Asking to learn if there is a difference from Java 11 onwards specifically in this area? Introduction of new GC probably? – Naman Jun 11 '19 at 04:25
  • AFAIK, no there isn't. But we don't know what will be in Java 13, 14, 15, ... that might hypothetically make streams **faster** than classic loops. – Stephen C Jun 11 '19 at 04:38
  • @tsolakp an `Iterable` has no control over the code performing the loop. A better abstraction would be a method accepting a `Consumer`, with the contract that the consumer must process the received object and is not allowed to store a reference to it. Then, the method may traverse the `ResultSet` and update the wrapper on each line, before calling the consumer’s `accept` method. This API would also allow to measure the differences between creating a new object per row and reusing a wrapper, with the smallest code change. It’s similar to Spring’s `RowMapper`, called per row, but a level higher. – Holger Jun 11 '19 at 09:06
  • 1
    @Naman I’d say, there are already scenarios where a stream performs on par or even better than loops, but the statement about preferring the loop is precisely for scenarios where this still is not the case. And well, there’s the plan to add true value types, which would make immutable wrapper objects the preferred choice here, but still, there could be a performance difference between loops and stream in either direction. – Holger Jun 11 '19 at 09:15
9

You can have some handy functions and also can have thread-safe version to work with parallel.

Function<T,U> threadSafeReusableWrapper(Supplier<U> newWrapperInstanceFn, BiConsumer<U,T> wrapFn) {
   final ThreadLocal<T> wrapperStorage = ThreadLocal.withInitial(newWrapperInstanceFn);
   return item -> {
      T wrapper = wrapperStorage.get();
      wrapFn.consume(wrapper, item);
      return wrapper;
   }
}

Function<T,U> reusableWrapper(U wrapper, BiConsumer<U,T> wrapFn) {
   return item -> {
      wrapFn.consume(wrapper, item);
      return wrapper;
   };
}

list.stream()
    .map(reusableWrapper(new Wrapper(), Wrapper::setSource))
    .forEach( w -> processWrapper(w) );
list.stream()
    .map(threadSafeReusableWrapper(Wrapper::new, Wrapper::setSource))
     .parallel()
    .forEach( w -> processWrapper(w) );

However, I do not think it is worth. These wrappers are short-living so unlikely leave young generation so will be garbage collected very quickly. Though, I think this idea worth checking with micro-benchmark library JMH

Alexander Pavlov
  • 2,264
  • 18
  • 25
  • 1
    Thanks. I am thinking on the same line too. It might not be worth doing this hack in order to avoid object creation which can potentially be effeciently handled by GC. – tsolakp Jun 08 '19 at 03:17
  • 1
    There is a potential problem with garbage collection of `newWrapperInstanceFn` in `threadSafeReusableWrapper`. The reference to the value of a `ThreadLocal` is only reliably cleared when `ThreadLocal.remove` is called, or the thread is terminated. Until that happens `newWrapperInstanceFn` might linger. See this [answer](https://stackoverflow.com/a/27525780/452775) for more information. – Lii Jun 17 '19 at 08:00