So you need to extract groups with the size of 3 elements from the stream in accordance with their order.
It can be done using Stream API by implementing a custom collector that implements the Collector interface.
While initializing the GroupCollector
size of the group has to be provided (it's was done to make the collector more flexible and avoid hard-coding the value of 3
inside the class).
Deque<List<T>>
is used as a mutable container because the Deque
interface provides convenient access to the last element.
combiner()
method provides the logic of how to combine results of the execution obtained by different threads. Parallel stream provides a guarantee for the collect()
operation that the initial order of the stream will be preserved and results from the different threads will be joined with respect to the order they were assigned with their tasks. Therefore this solution can be parallelized.
The logic of combining the two queues produced by different treads entails the following concerns:
- make sure that all groups (except for one that should be the last) have exactly
3
elements. Therefore we can't simply add all the contents of the second deque to the first deque. Instead, every group of the second deque has to be processed one by one.
- lists that are already created should be reused.
finisher()
function will discard the last list in the deque if its size is less than the groupSize
(requirement provided by the PO in the comment).
As an example, I've used the sequence of numbers from the question.
public static void main(String[] args) {
Stream<BigDecimal> source =
IntStream.of(4, 5, 6, 61, 3, 9, 3, 1, 7, 2, 6)
.mapToObj(BigDecimal::valueOf);
System.out.println(createGroups(source)
.flatMap(List::stream)
.collect(Collectors.toList())); // collecting to list for demonstration purposes
}
Method createGroups()
public static Stream<List<BigDecimal>> createGroups(Stream<BigDecimal> source) {
return source
.collect(new GroupCollector<BigDecimal>(3))
.stream()
.filter(list -> averageIsLessThen(list, 30));
}
Collector
public class GroupCollector<T> implements Collector<T, Deque<List<T>>, Deque<List<T>>> {
private final int groupSize;
public GroupCollector(int groupSize) {
this.groupSize = groupSize;
}
@Override
public Supplier<Deque<List<T>>> supplier() {
return ArrayDeque::new;
}
@Override
public BiConsumer<Deque<List<T>>, T> accumulator() {
return (deque, next) -> {
if (deque.isEmpty() || deque.getLast().size() == groupSize) {
List<T> group = new ArrayList<>();
group.add(next);
deque.addLast(group);
} else {
deque.getLast().add(next);
}
};
}
@Override
public BinaryOperator<Deque<List<T>>> combiner() {
return (deque1, deque2) -> {
if (deque1.isEmpty()) {
return deque2;
} else if (deque1.getLast().size() == groupSize) {
deque1.addAll(deque2);
return deque1;
}
// last group in the deque1 has a size less than groupSize
List<T> curGroup = deque1.pollLast();
List<T> nextGroup;
for (List<T> nextItem: deque2) {
nextGroup = nextItem;
Iterator<T> iter = nextItem.iterator();
while (iter.hasNext() && curGroup.size() < groupSize) {
curGroup.add(iter.next());
iter.remove();
}
deque1.add(curGroup);
curGroup = nextGroup;
}
if (curGroup.size() != 0) {
deque1.add(curGroup);
}
return deque1;
};
}
@Override
public Function<Deque<List<T>>, Deque<List<T>>> finisher() {
return deque -> {
if (deque.peekLast() != null && deque.peekLast().size() < groupSize) {
deque.pollLast();
}
return deque;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
The auxiliary method that is used to validate a group of elements based on its average value (in case you are wondering what RoundingMode
is meant for, then read this answer).
private static boolean averageIsLessThen(List<BigDecimal> list, double target) {
BigDecimal average = list.stream()
.reduce(BigDecimal.ZERO, BigDecimal::add)
.divide(BigDecimal.valueOf(list.size()), RoundingMode.HALF_UP);
return average.compareTo(BigDecimal.valueOf(target)) < 0;
}
output (expected result: { 4, 5, 6, 61, 3, 9, 3, 1, 7 }, provided by the PO)
[4, 5, 6, 61, 3, 9, 3, 1, 7]