I want to have something similar to Collectors.maxBy()
, a collector that gets the top elements in a collection (maxBy
only gets one).
I have a stream of Possibility
objects that could be scored with an Integer score(Possibility)
method.
First I tried:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(Collectors.toList());
if(!possibilities.isEmpty()) {
int bestScore = possibilities.stream()
.mapToInt(p -> score(p))
.max()
.getAsInt();
possibilities = possibilities.stream()
.filter(p -> score(p)==bestScore)
.collect(Collectors.toList());
}
But doing that, I scan the collection three times. Once to build it, a second time to get the top score, and a third time to filter it and that is not optimal. Moreover the number of possibiities could be huge (>1012).
The best way should be to directly get the top possibilities in the first collect but there seems to be no built in collector to do such a thing.
So I implemented my own Collector
:
public class BestCollector<E> implements Collector<E, List<E>, List<E>> {
private final Comparator<E> comparator;
private final Class<? extends List> listImpl ;
public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
this.comparator = comparator;
this.listImpl = listImpl;
}
public BestCollector(Comparator<E> comparator) {
this.comparator= comparator;
listImpl = ArrayList.class;
}
@Override
public Supplier<List<E>> supplier() {
return () -> {
try {
return listImpl.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
};
}
@Override
public BiConsumer<List<E>, E> accumulator() {
return (list, e) -> {
if (list.isEmpty()) {
list.add(e);
} else {
final int comparison = comparator.compare(list.get(0), e);
if (comparison == 0) {
list.add(e);
} else if (comparison < 0) {
list.clear();
list.add(e);
}
}
};
}
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
};
}
@Override
public Function<List<E>, List<E>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
And then:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));
And that does the job in a sequential mode (without the .parallel()
) but in parallel mode there are some exceptions occasionally in two spots:
A
java.lang.IndexOutOfBoundsException Index: 0, Size: 0
in the line:final int comparison = comparator.compare(list.get(0), e);
of the accumulator()
method
I understand it happens when a list.clear()
is called between list.isEmpty()
and list.get(0)
.
A
java.lang.NullPointerException
in the score(Possibility) method because the possibility isnull
. Again the same line is involved:final int comparison = comparator.compare(list.get(0), e);
I don't understand how list.get(0)
could return null
...
In parallel mode, sometimes list.get(0)
raises a IndexOutOfBoundsException
and sometimes return null
.
I understand that my code is not thread safe so I tried several solutions:
- Add
synchronized
in all methods of BestCollector:public synchronized …
- Use a thread-safe collection instead of
ArrayList
:java.util.concurrent.CopyOnWriteArrayList
- Add
synchronized
and useCopyOnWriteArrayList
at the same time Remove
Characteristics.CONCURRENT
out of theSet<Characteristics>
of thecharacteristics()
method@Override public Set<Characteristics> characteristics() { return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED); }
But I don't know if the Characteristics.CONCURRENT
is here to indicate that my code is thread safe or that my code will be used in a concurrency processing.
But none of these solutions actually solve the problem.
In fact when I remove CONCURRENT out of the characteristics there is, sometimes, a java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
but in the line:
final int comparison = comparator.compare(l1.get(0), l2.get(0));
of the combiner()
method.
However, the exceptions raised by the accumulator()
method seem to not occur anymore.
@Holger's answer is right.
The complete solution is to change both combiner()
and characteristics()
methods:
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
if (l1.isEmpty()) {
return l2;
} else if (l2.isEmpty()) {
return l1;
} else {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
}
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}