I found a working answer by using two different solutions
The criteria are
- I don't want to collect the entire 'lines' stream before seeing 'orders'
- I don't want to see a Map
- I use the fact that lines stream is sorted by order (from the database)
The first answer is that it is a FLATMAP problem. It is possible to make a flatmap function that groups all 'lines' belonging to same 'order' -- BINGO ..
But, Flatmap function got a problem - last 'line' will never turn into a group because the stream will be closed before flatmap function gets a chance to send the last element. This can be worked around by adding an EOS line using stream concat.
But but but it is difficult to explain for other developers. The solution to missing the last element is much better solved by changing the way the stream reads next element and ask question does next element exist.
The solution must change the Stream so we can get the last element handled as well.
My final solution is to make own Stream and manage the Iterator
The test looks like this
public void grouping() {
Stream<String> cartesian = Stream.of("100-1", "100-3", "100-4",
"120-1", "120-2",
"133-1"); // ... millions of elements
Comparator<String> comp = new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
return extractKey(o1).compareTo(extractKey(o2));
}
private String extractKey(String element) {
String str = element;
return str.substring(0, str.indexOf('-'));
}
};
final Stream<List<String>> grouped= GroupStream
.of(cartesian)
.chunkToStreamOfListByComparator(comp);
grouped.forEach(row -> System.out.println("Grouped " + row));
}
Output:
- Grouped [100-1, 100-3, 100-4]
- Grouped [120-1, 120-2]
- Grouped [133-1]
I have made a class called GroupStream. This class makes a new stream (where elements are grouped in a list). A source stream is read inside the class.
The heavy parts are:
- make iterator's hasNext() work
- make the iterator's next() work
- understanding how to create new stream.
The Java class looks like this
package dk.otc.demo;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* @param <ELEMENT> The instance type in Stream.
*/
public class GroupStream<ELEMENT> {
private final Iterator<ELEMENT> sourceIterator;
/**
* @param source a stream of elements that contains identical information to
* group the elements into a list with this identical information.
*
* A precondition is that the stream is sorted
*/
public GroupStream(Stream<ELEMENT> source) {
sourceIterator = source.iterator();
}
/**
* @param comparator function that defines how to compare ELEMENT in
* order to get knowledge of when to continue adding a source ELEMENT to
* the current group and when to ship away the current group and start
* building an new group.
* @return a stream with the grouped elements
*/
public Stream<List<ELEMENT>> chunkToStreamOfListByComparator(Comparator<? super ELEMENT> comparator) {
final Iterator<List<ELEMENT>> chunkIterator = new Iterator<List<ELEMENT>>() {
/**
* Makes the iterator {@link #hasNext()} return a consistent value
*/
Boolean consistentHasNext = null;
List<ELEMENT> chunkUnderConstruction = initChunk();
ELEMENT firstInNext = null;
@Override
public boolean hasNext() {
if (consistentHasNext != null)
return consistentHasNext;
boolean more = sourceIterator.hasNext();
if (!more && chunkUnderConstruction.isEmpty()) {
return false;
}
boolean same = more;
while (same && more) {
ELEMENT value = sourceIterator.next();
same = same(value);
if (same) {
add(value);
} else {
firstInNext = value;
}
more = sourceIterator.hasNext();
}
consistentHasNext = (!chunkUnderConstruction.isEmpty()) || firstInNext != null;
return consistentHasNext;
}
@Override
public List<ELEMENT> next() {
try {
consistentHasNext = null;
return chunkUnderConstruction;
} finally {
chunkUnderConstruction = initChunk();
if (firstInNext != null) {
add(firstInNext);
firstInNext = null;
}
}
}
private List<ELEMENT> initChunk() {
return new ArrayList<>();
}
private void add(ELEMENT value) {
chunkUnderConstruction.add(value);
}
boolean same(ELEMENT element) {
final boolean res;
if (chunkUnderConstruction.isEmpty()) {
res = true;
} else {
res = comparator.compare(chunkUnderConstruction.get(0), element) == 0;
}
return res;
}
};
final Spliterator<List<ELEMENT>> split = Spliterators.spliterator(chunkIterator, -1,
(Spliterator.ORDERED // A collection like List (Not Map). FIFO is ordered
| Spliterator.NONNULL // No null will arrive
| Spliterator.IMMUTABLE // Stream is not mutated during execution of stream
));
return StreamSupport.stream(split, false);
}
public static <ELEMENT> GroupStream<ELEMENT> of(Stream<ELEMENT> source) {
return new GroupStream<>(source);
}
}