I've got a problem at hand that I'm trying to solve with something I'm pretty sure I'm not supposed to do but don't see an alternative. I'm given a List of Strings and should split it up into chunks of a given size. The result then has to be passed to some method for further processing. As the list might be huge the processing should be done asynchronously.
My approach is to create a custom Collector that takes the Stream of Strings and converts it to a Stream<List<Long>>:
final Stream<List<Long>> chunks = list
.stream()
.parallel()
.collect(MyCollector.toChunks(CHUNK_SIZE))
.flatMap(p -> doStuff(p))
.collect(MyCollector.toChunks(CHUNK_SIZE))
.map(...)
...
The code for the Collector:
public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;
private MyCollector(final int chunkSize){
this.chunkSize = chunkSize;
}
@Override
public Supplier<A> supplier() {
return () -> (A)new ArrayList<List<T>>();
}
@Override
public BiConsumer<A, T> accumulator() {
return (A candidate, T acc) -> {
if (index.getAndIncrement() % chunkSize == 0){
candidate.add(new ArrayList<>(chunkSize));
current.incrementAndGet();
}
candidate.get(current.get()).add(acc);
};
}
@Override
public BinaryOperator<A> combiner() {
return (a1, a2) -> {
a1.addAll(a2);
return a1;
};
}
@Override
public Function<A, R> finisher() {
return (a) -> (R)a.stream();
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}
public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
return new MyCollector<>(chunkSize);
}
}
This seems to work in most cases but I get a NPE sometimes.. I'm sure the in the accumulator is not thread safe as there might be two threads interfering when adding new Lists to the main List. I don't mind a chunk having a few too many or too little elements though.
I've tried this instead of the current supplier function:
return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};
To make sure there is always a List present. This doesn't work at all and results in empty lists.
Issues:
- I'm pretty sure a custom Spliterator would be a good solution. It would not work for synchronous scenarios however. Also, am I sure the Spliterator is called?
- I'm aware I shouldn't have state at all but not sure how to change it.
Questions:
- Is this approach completely wrong or somehow fixable?
- If I use a Spliterator - can I be sure it's called or is that decided by the underlying implementation?
- I'm pretty sure the casts to (A) and (R) in the supplier and finisher are not necessary but IntelliJ complains. Is there something I'm missing?
EDIT:
- I've added some more to the client code as the suggestions with IntStream.range won't work when chained.
- I realize I could do it differently as suggested in a comment but it's also a little bit about style and knowing if it's possible.
- I have CONCURRENT characteristic because I assume the Stream API would fall back to synchronous handling otherwise. The solution is not thread-safe as stated before.
Any help would be greatly appreciated.
Best, D