I am trying to write a java 8 stream collector which mirrors the functionality of rxjava buffer operator
I have a working code for this:
// This will gather numbers 1 to 13 and combine them in groups of
// three while preserving the order even if its a parallel stream.
final List<List<String>> triads = IntStream.range(1, 14)
.parallel()
.boxed()
.map(Object::toString)
.collect(ArrayList::new, accumulator, combiner);
System.out.println(triads.toString())
The accumulator here is this:
final BiConsumer<List<List<String>>, String> accumulator = (acc, a) -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Accumulator|");
stringBuilder.append("Before: ").append(acc.toString());
int accumulatorSize = acc.size();
if (accumulatorSize == 0) {
List<String> newList = new ArrayList<>();
newList.add(a);
acc.add(newList);
} else {
List<String> lastList = acc.get(accumulatorSize - 1);
if (lastList.size() != 3) {
lastList.add(a);
} else {
List<String> newList = new ArrayList<>();
newList.add(a);
acc.add(newList);
}
}
stringBuilder.append("|After: ").append(acc.toString());
stringBuilder.append("|a: ").append(a);
System.out.println(stringBuilder.toString());
};
And the combiner
// Utility method to make first list of size 3
// by shifting elements from second to first list
final BiConsumer<List<String>, List<String>> fixSize = (l1, l2) -> {
while(l1.size() != 3 && l2.size() > 0) {
l1.add(l2.remove(0));
}
};
final BiConsumer<List<List<String>>, List<List<String>>> combiner = (l1, l2) -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("Combiner|");
stringBuilder.append("Before, l1: ").append(l1).append(", l2: ").append(l2);
if (l1.isEmpty()) {
// l1 is empty
l1.addAll(l2);
} else {
// l1 is not empty
List<String> lastL1List = l1.get(l1.size() - 1);
if (lastL1List.size() == 3) {
l1.addAll(l2);
} else {
if (l2.isEmpty()) {
// do nothing
} else {
List<List<String>> fixSizeList = new ArrayList<>(1 + l2.size());
fixSizeList.add(lastL1List);
fixSizeList.addAll(l2);
for (int i = 0; i < fixSizeList.size() - 1; i++) {
List<String> x = fixSizeList.get(i), y = fixSizeList.get(i + 1);
fixSize.accept(x, y);
}
l2.stream().filter(l -> !l.isEmpty()).forEach(l1::add);
// everything is now of size three except, may be last
}
}
}
stringBuilder.append("|After, l1: ").append(l1).append(", l2: ").append(l2);
System.out.println(stringBuilder.toString());
};
This produces the following output:
Accumulator|Before: []|After: [[12]]|a: 12
Accumulator|Before: []|After: [[2]]|a: 2
Accumulator|Before: []|After: [[11]]|a: 11
Accumulator|Before: []|After: [[6]]|a: 6
Accumulator|Before: []|After: [[4]]|a: 4
Accumulator|Before: []|After: [[1]]|a: 1
Accumulator|Before: []|After: [[13]]|a: 13
Accumulator|Before: []|After: [[8]]|a: 8
Accumulator|Before: []|After: [[3]]|a: 3
Accumulator|Before: []|After: [[5]]|a: 5
Accumulator|Before: []|After: [[10]]|a: 10
Accumulator|Before: []|After: [[7]]|a: 7
Accumulator|Before: []|After: [[9]]|a: 9
Combiner|Before, l1: [[5]], l2: [[6]]|After, l1: [[5, 6]], l2: [[]]
Combiner|Before, l1: [[12]], l2: [[13]]|After, l1: [[12, 13]], l2: [[]]
Combiner|Before, l1: [[2]], l2: [[3]]|After, l1: [[2, 3]], l2: [[]]
Combiner|Before, l1: [[8]], l2: [[9]]|After, l1: [[8, 9]], l2: [[]]
Combiner|Before, l1: [[10]], l2: [[11]]|After, l1: [[10, 11]], l2: [[]]
Combiner|Before, l1: [[4]], l2: [[5, 6]]|After, l1: [[4, 5, 6]], l2: [[]]
Combiner|Before, l1: [[1]], l2: [[2, 3]]|After, l1: [[1, 2, 3]], l2: [[]]
Combiner|Before, l1: [[7]], l2: [[8, 9]]|After, l1: [[7, 8, 9]], l2: [[]]
Combiner|Before, l1: [[10, 11]], l2: [[12, 13]]|After, l1: [[10, 11, 12], [13]], l2: [[13]]
Combiner|Before, l1: [[1, 2, 3]], l2: [[4, 5, 6]]|After, l1: [[1, 2, 3], [4, 5, 6]], l2: [[4, 5, 6]]
Combiner|Before, l1: [[7, 8, 9]], l2: [[10, 11, 12], [13]]|After, l1: [[7, 8, 9], [10, 11, 12], [13]], l2: [[10, 11, 12], [13]]
Combiner|Before, l1: [[1, 2, 3], [4, 5, 6]], l2: [[7, 8, 9], [10, 11, 12], [13]]|After, l1: [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12], [13]], l2: [[7, 8, 9], [10, 11, 12], [13]]
[[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12], [13]]
I might have completely butchered the concept of streams in this but is there a way to simplify, optimize or rewrite this ?
Here is the complete program for simplicity (sadly, stackoverflow doesn't allow posting code as-is without enough description)