38

Let's assume I've got a Stream<T> and want to get only distinct elements and sorted.

The naïve approach would be to do just the following:

Stream.of(...)
    .sorted()
    .distinct()

or, maybe the other way around:

Stream.of(...)
    .distinct()
    .sorted()

Since the implementation of both of them is not really accessible by the JDK's source code I was just wondering about possible memory consumption and performance implications.

Or would it be even more efficient to write my own filter as the following?

Stream.of(...)
    .sorted()
    .filter(noAdjacentDuplicatesFilter())

public static Predicate<Object> noAdjacentDuplicatesFilter() {
    final Object[] previousValue = {new Object()};

    return value -> {
        final boolean takeValue = !Objects.equals(previousValue[0], value);
        previousValue[0] = value;
        return takeValue;
    };
}
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Michaela Elschner
  • 1,268
  • 1
  • 10
  • 22
  • 4
    In the best case, the underlying implementation recognizes, if `distinct()` and `sort()` follow each other and fuse them into a single operation. Keep in mind that Streams are lazy, it doesn’t do anything until you chained a terminal operation and by this time, it knows what you have chained. – Holger May 05 '17 at 13:49
  • @Holger I understand that; I'd just be interested if that actually happens and if this behaviour is guaranteed. – Michaela Elschner May 05 '17 at 14:01
  • Well, I guess it also depends on the nature of the data: very few distinct values occurring many times, or lots of different values with a few duplicates... In the second case sorted() then distinct() is better; in the first case it may be that distinct() then sort() could be faster, especially for scattered data. My two cents. – Laurent Grégoire Sep 20 '21 at 14:07

2 Answers2

31

When you chain a distinct() operation after sorted(), the implementation will utilize the sorted nature of the data and avoid building an internal HashSet, which can be demonstrated by the following program

public class DistinctAndSort {
    static int COMPARE, EQUALS, HASHCODE;
    static class Tracker implements Comparable<Tracker> {
        static int SERIAL;
        int id;
        Tracker() {
            id=SERIAL++/2;
        }
        public int compareTo(Tracker o) {
            COMPARE++;
            return Integer.compare(id, o.id);
        }
        public int hashCode() {
            HASHCODE++;
            return id;
        }
        public boolean equals(Object obj) {
            EQUALS++;
            return super.equals(obj);
        }
    }
    public static void main(String[] args) {
        System.out.println("adjacent sorted() and distinct()");
        Stream.generate(Tracker::new).limit(100)
              .sorted().distinct()
              .forEachOrdered(o -> {});
        System.out.printf("compareTo: %d, EQUALS: %d, HASHCODE: %d%n",
                          COMPARE, EQUALS, HASHCODE);
        COMPARE=EQUALS=HASHCODE=0;
        System.out.println("now with intermediate operation");
        Stream.generate(Tracker::new).limit(100)
            .sorted().map(x -> x).distinct()
            .forEachOrdered(o -> {});
        System.out.printf("compareTo: %d, EQUALS: %d, HASHCODE: %d%n",
                          COMPARE, EQUALS, HASHCODE);
    }
}

which will print

adjacent sorted() and distinct()
compareTo: 99, EQUALS: 99, HASHCODE: 0
now with intermediate operation
compareTo: 99, EQUALS: 100, HASHCODE: 200

The intermediate operation, as simple as map(x -> x), can’t be recognized by the Stream implementation, hence, it must assume that the elements might not be sorted in respect to the mapping function’s result.

There is no guaranty that this kind of optimization happens, however, it is reasonable to assume that the developers of the Stream implementation will not remove that optimization and even try to add more optimizations, so rolling your own implementation will prevent your code from benefiting from future optimizations.

Further, what you have created is a “stateful predicate”, which is strongly discouraged, and, of course, will break when being used with a parallel stream.

If you don’t trust the Stream API to perform this operation efficiently enough, you might be better off implementing this particular operation without the Stream API.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • Actually I needed the stateful predicate for filtering results elsewhere in my code. It's also encouraged here: http://stackoverflow.com/questions/27870136/java-lambda-stream-distinct-on-arbitrary-key/27872086#27872086 – Michaela Elschner May 05 '17 at 15:23
  • 4
    It’s not really encouraged, there is simply no cleaner alternative solution for this kind of task, however, maybe that answer should emphasize the drawbacks better. Note that it uses a `ConcurrentHashMap` to ensure that it doesn’t break entirely when being used concurrently. It will still not maintain the encounter order when being used with a parallel stream, but since the specific task has an addition as terminal operation, it might not matter there, but may break badly in other scenarios. – Holger May 05 '17 at 15:40
2

Disclaimer: I know performance testing is hard and especially on the JVM with warmups needed and a controlled environment with no other processes running.

If i test it I get these results, so it seems your implementation benefits parallel execution. (Running on i7 with 4 cores + hyperthreading).

So ".distinct().sorted()" seems to be slower. As predicted/explained by Holger

Round 1 (Warm up?)
3938
2449
5747
Round 2
2834
2620
3984
Round 3 Parallel
831
4343
6346
Round 4 Parallel
825
3309
6339

Using Code:

package test.test;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class SortDistinctTest {

    public static void main(String[] args) {
        IntStream range = IntStream.range(0, 6_000_000);
        List<Integer> collect = range.boxed().collect(Collectors.toList());
        Collections.shuffle(collect);

        long start = System.currentTimeMillis();

        System.out.println("Round 1 (Warm up?)");
        collect.stream().sorted().filter(noAdjacentDuplicatesFilter()).collect(Collectors.counting());
        long fst = System.currentTimeMillis();
        System.out.println(fst - start);

        collect.stream().sorted().distinct().collect(Collectors.counting());
        long snd = System.currentTimeMillis();
        System.out.println(snd - fst);

        collect.stream().distinct().sorted().collect(Collectors.counting());
        long end = System.currentTimeMillis();
        System.out.println(end - snd);

        System.out.println("Round 2");
        collect.stream().sorted().filter(noAdjacentDuplicatesFilter()).collect(Collectors.counting());
        fst = System.currentTimeMillis();
        System.out.println(fst - end);

        collect.stream().sorted().distinct().collect(Collectors.counting());
        snd = System.currentTimeMillis();
        System.out.println(snd - fst);

        collect.stream().distinct().sorted().collect(Collectors.counting());
        end = System.currentTimeMillis();
        System.out.println(end - snd);

        System.out.println("Round 3 Parallel");
        collect.stream().parallel().sorted().filter(noAdjacentDuplicatesFilter()).collect(Collectors.counting());
        fst = System.currentTimeMillis();
        System.out.println(fst - end);

        collect.stream().parallel().sorted().distinct().collect(Collectors.counting());
        snd = System.currentTimeMillis();
        System.out.println(snd - fst);

        collect.stream().parallel().distinct().sorted().collect(Collectors.counting());
        end = System.currentTimeMillis();
        System.out.println(end - snd);

        System.out.println("Round 4 Parallel");
        collect.stream().parallel().sorted().filter(noAdjacentDuplicatesFilter()).collect(Collectors.counting());
        fst = System.currentTimeMillis();
        System.out.println(fst - end);

        collect.stream().parallel().sorted().distinct().collect(Collectors.counting());
        snd = System.currentTimeMillis();
        System.out.println(snd - fst);

        collect.stream().parallel().distinct().sorted().collect(Collectors.counting());
        end = System.currentTimeMillis();
        System.out.println(end - snd);

    }

    public static Predicate<Object> noAdjacentDuplicatesFilter() {
        final Object[] previousValue = { new Object() };

        return value -> {
            final boolean takeValue = !Objects.equals(previousValue[0], value);
            previousValue[0] = value;
            return takeValue;
        };

    }

}
Viktor Mellgren
  • 4,318
  • 3
  • 42
  • 75
  • 3
    The only thing, I predicted, is that `noAdjacentDuplicatesFilter()` will produce incorrect results with a parallel stream. The performance is irrelevant then. – Holger May 05 '17 at 15:13
  • 1
    Well if you put it that way. But I guess it is better to produce incorrect results fast than to produce incorrect results slow ;) Fail-fast. – Viktor Mellgren May 05 '17 at 22:43
  • 1
    “fail fast” means failing fast in a recognizable way, like throwing an exception. It’s the absolute opposite of producing incorrect results. And it is meant as a response to in incorrect request, not as a suggestion to produce failures, were producing the correct result is possible. – Holger May 08 '17 at 07:31
  • Hence the wink emoji – Viktor Mellgren May 08 '17 at 09:13
  • 1
    Wasn’t sure about it, maybe because I have encountered people really preferring speed over correctness before… – Holger May 08 '17 at 09:19
  • Well I work in finance so for me correctness > speed any day :D – Viktor Mellgren May 08 '17 at 09:22