10

I want to have something similar to Collectors.maxBy(), a collector that gets the top elements in a collection (maxBy only gets one).

I have a stream of Possibility objects that could be scored with an Integer score(Possibility) method.

First I tried:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(Collectors.toList());

if(!possibilities.isEmpty()) {
    int bestScore = possibilities.stream()
        .mapToInt(p -> score(p))
        .max()
        .getAsInt();
    possibilities = possibilities.stream()
        .filter(p -> score(p)==bestScore)
        .collect(Collectors.toList());
}

But doing that, I scan the collection three times. Once to build it, a second time to get the top score, and a third time to filter it and that is not optimal. Moreover the number of possibiities could be huge (>1012).

The best way should be to directly get the top possibilities in the first collect but there seems to be no built in collector to do such a thing.

So I implemented my own Collector:

public class BestCollector<E> implements Collector<E, List<E>, List<E>> {

    private final Comparator<E> comparator;

    private final Class<? extends List> listImpl ;

    public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
        this.comparator = comparator;
        this.listImpl = listImpl;
    }

    public BestCollector(Comparator<E> comparator) {
        this.comparator= comparator;
        listImpl = ArrayList.class;
    }

    @Override
    public Supplier<List<E>> supplier() {
        return () -> {
            try {
                return listImpl.newInstance();
            } catch (InstantiationException | IllegalAccessException ex) {
                throw new RuntimeException(ex);
            }
        };
    }

    @Override
    public BiConsumer<List<E>, E> accumulator() {
        return (list, e) -> {
            if (list.isEmpty()) {
                list.add(e);
            } else {
                final int comparison = comparator.compare(list.get(0), e);
                if (comparison == 0) {
                    list.add(e);
                } else if (comparison < 0) {
                    list.clear();
                    list.add(e);
                }
            }
        };
    }

    @Override
    public BinaryOperator<List<E>> combiner() {
        return (l1, l2) -> {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        };
    }

    @Override
    public Function<List<E>, List<E>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
    }
}

And then:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));

And that does the job in a sequential mode (without the .parallel()) but in parallel mode there are some exceptions occasionally in two spots:

  • A java.lang.IndexOutOfBoundsException Index: 0, Size: 0 in the line:

    final int comparison = comparator.compare(list.get(0), e);
    

of the accumulator() method

I understand it happens when a list.clear() is called between list.isEmpty() and list.get(0).

  • A java.lang.NullPointerException in the score(Possibility) method because the possibility is null. Again the same line is involved:

    final int comparison = comparator.compare(list.get(0), e);
    

I don't understand how list.get(0) could return null...

In parallel mode, sometimes list.get(0) raises a IndexOutOfBoundsException and sometimes return null.

I understand that my code is not thread safe so I tried several solutions:

  • Add synchronized in all methods of BestCollector: public synchronized …
  • Use a thread-safe collection instead of ArrayList: java.util.concurrent.CopyOnWriteArrayList
  • Add synchronized and use CopyOnWriteArrayList at the same time
  • Remove Characteristics.CONCURRENT out of the Set<Characteristics> of the characteristics() method

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
    }
    

But I don't know if the Characteristics.CONCURRENT is here to indicate that my code is thread safe or that my code will be used in a concurrency processing.

But none of these solutions actually solve the problem.


In fact when I remove CONCURRENT out of the characteristics there is, sometimes, a java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 but in the line:

final int comparison = comparator.compare(l1.get(0), l2.get(0));

of the combiner() method.

However, the exceptions raised by the accumulator() method seem to not occur anymore.


@Holger's answer is right.

The complete solution is to change both combiner() and characteristics() methods:

@Override
public BinaryOperator<List<E>> combiner() {
    return (l1, l2) -> {
        if (l1.isEmpty()) {
            return l2;
        } else if (l2.isEmpty()) {
            return l1;
        } else {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        }
    };
}

@Override
public Set<Characteristics> characteristics() {
    return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
rolve
  • 10,083
  • 4
  • 55
  • 75
kwisatz
  • 1,266
  • 3
  • 16
  • 36
  • 1
    I don't see anything wrong with your Collector implementation (which is basically the same implementation of the accepted answer of this question: http://stackoverflow.com/questions/29334404/how-to-force-max-to-return-all-maximum-values-in-a-java-stream/29334774) – Alexis C. Apr 28 '15 at 12:38
  • 1
    Interestingly, removing the `CONCURRENT` characteristic makes it working for me, so I guess you'd have to look into this direction. – Alexis C. Apr 28 '15 at 12:44

1 Answers1

9

Your code has only one significant error: if your collector is not thread safe, it shouldn’t report Characteristics.CONCURRENT as that is exactly claiming that it was thread safe.

The important point you have to understand is that for non-CONCURRENT collectors, the framework will perform the necessary steps to use it in a thread-safe but still efficient manner:

  • for each worker thread, a new container will be acquired via the supplier()
  • each worker will use the accumulator() function together with its own local container
  • the combiner() will be used once two worker thread have finished their work
  • the finisher() will be used when all worker threads have finished their work and all containers have been combined

So all you have to do is to ensure that your supplier truly returns a new instance on each invocation and that all functions are non-interfering and side-effect free (regarding anything else but the container they receive as arguments) and, of course, not report Characteristics.CONCURRENT when your collector isn’t a concurrent collector.

You don’t need the synchronized keyword nor concurrent collections here.


By the way, a Comparator of the form (p1, p2) -> score(p1).compareTo(score(p2)) can be implemented using Comparator.comparing(p -> score(p)) or if the score value is an int: Comparator.comparingInt(p -> score(p)).


Finally, your combiner function does not check whether one of the lists is empty. This perfectly explains an IndexOutOfBoundsException within the combiner while the IndexOutOfBoundsException within the accumulator is the result of your collector reporting Characteristics.CONCURRENT


It’s also important to understand that adding a synchronized keyword to an accumulator() or combiner() method does not guard the function constructed via lambda expression. It will guard the method which constructs the function instance, but not the function’s code itself. In contrast to an inner class, there is no way the add a synchronized keyword to the actual function’s implementation method.

Jens Bannmann
  • 4,845
  • 5
  • 49
  • 76
Holger
  • 285,553
  • 42
  • 434
  • 765
  • Yes I saw that the `synchronized` don't behave like it use to... Also I try to add a synchronized block : `synchronized(list) {...` in the `accumulator()` method, keep the `CONCURRENT` in the characteristics set and don't check for `isEmpty()` in the `combiner()` method and it's working too but your solution is faster in terms of execution time – kwisatz Apr 28 '15 at 14:25