6

I have very big Stream of versioned documents ordered by document id and version.

E.g. Av1, Av2, Bv1, Cv1, Cv2

I have to convert this into another Stream whose records are aggregated by document id.

A[v1, v2], B[v1], C[v1, V2]

Can this be done without using Collectors.groupBy()? I don't want to use groupBy() because it will load all items in the stream into memory before grouping them. In theory, one need not load the whole stream in memory because it is ordered.

mechnicov
  • 12,025
  • 4
  • 33
  • 56
sgp15
  • 1,280
  • 8
  • 13
  • 3
    Can you put some more light on the statement .. *because it will load all items in the stream into memory before grouping them*? – Naman Apr 12 '19 at 08:49
  • Stream pipelines by default are lazy. They load data on demand which is useful in developing code with low memory footprint. So one could process one item at a time without keeping all of them in memory. But with operations like groupBy all items in stream have to be consumed in order to build the result. Makes sense? – sgp15 Apr 12 '19 at 09:21
  • I think you're looking for what `StreamEx.groupRuns` does: http://amaembo.github.io/streamex/javadoc/one/util/streamex/StreamEx.html#groupRuns-java.util.function.BiPredicate- – millimoose Apr 12 '19 at 10:17
  • 1
    When you use `groupingBy(Function,Collector)` with a downstream collector aggregating to a result that does not reference individual elements, the elements are *not* held in memory. The key point is what you want to do with the resulting stream. Apparently, you assume that the subsequent operation does not need everything held in memory. Then, just do it right inside the first `collect` operation. – Holger Apr 12 '19 at 12:18
  • @millimoose your suggestion worked perfectly for me. Do you want to post it as an answer? – sgp15 Apr 15 '19 at 03:51
  • Done, hope it checks out, it's been a hot minute since I worked with Java and streams are after my time, I'm extrapolating from the similar-seeming ReactiveX mainly. – millimoose Apr 15 '19 at 13:35

3 Answers3

1

Here's a solution I came up with:

    Stream<Document> stream = Stream.of(
            new Document("A", "v1"),
            new Document("A", "v2"),
            new Document("B", "v1"),
            new Document("C", "v1"),
            new Document("C", "v2")
    );

    Iterator<Document> iterator = stream.iterator();
    Stream<GroupedDocument> result = Stream.generate(new Supplier<GroupedDocument>() {

        Document lastDoc = null;
        @Override
        public GroupedDocument get() {
            try {
                Document doc = Optional.ofNullable(lastDoc).orElseGet(iterator::next);

                String id = doc.getId();
                GroupedDocument gd = new GroupedDocument(doc.getId());
                gd.getVersions().add(doc.getVersion());

                if (!iterator.hasNext()) {
                    return null;
                }

                while (iterator.hasNext() && (doc = iterator.next()).getId().equals(id)) {
                    gd.getVersions().add(doc.getVersion());
                }
                lastDoc = doc;
                return gd;
            } catch (NoSuchElementException ex) {
                return null;
            }
        }
    });

Here are the Document and GroupedDocument classes:

class Document {
    private String id;
    private String version;

    public Document(String id, String version) {
        this.id = id;
        this.version = version;
    }

    public String getId() {
        return id;
    }

    public String getVersion() {
        return version;
    }
}

class GroupedDocument {
    private String id;
    private List<String> versions;

    public GroupedDocument(String id) {
        this.id = id;
        versions = new ArrayList<>();
    }

    public String getId() {
        return id;
    }

    public List<String> getVersions() {
        return versions;
    }

    @Override
    public String toString() {
        return "GroupedDocument{" +
                "id='" + id + '\'' +
                ", versions=" + versions +
                '}';
    }
}

Note that the resulting stream is an infinite stream. After all the groups there will be an infinite number of nulls. You can take all the elements that are not null by using takeWhile in Java 9, or see this post.

Sweeper
  • 213,210
  • 22
  • 193
  • 313
  • Thanks so much for your answer. I am using suggestion made by @millimoose. I send my stream to Spring MVC which marshals it. So its more effort to stop consuming stream when nulls start showing up using something like takeWhile. – sgp15 Apr 15 '19 at 03:54
0

Would a Map<String, Stream<String>> help you with what you need ?

A - v1, v2
B - v1
C - v1, v2

 String[] docs = { "Av1", "Av2", "Bv1", "Cv1", "Cv2"};
 Map<String, Stream<String>> map = Stream.<String>of(docs).
         map(s ->s.substring(0, 1)).distinct().                      //leave only A B C
            collect(Collectors.toMap( s1 -> s1,                      //A B C as keys
                                      s1 ->Stream.<String>of(docs).  //value is filtered stream of docs
                                        filter(s2 -> s1.substring(0, 1).
                                          equals(s2.substring(0, 1)) ).
                                            map(s3 -> s3.substring(1, s3.length())) //trim A B C
                                     ));        
c0der
  • 18,467
  • 6
  • 33
  • 65
0

You can use groupRuns in the StreamEx library for this:

class Document {
    public String id;
    public int version;
    public Document(String id, int version) {
        this.id = id;
        this.version = version;
    }
    public String toString() {
        return "Document{"+id+version+ "}";
    }
}

public class MyClass {
    private static List<Document> docs = asList(
        new Document("A", 1),
        new Document("A", 2),
        new Document("B", 1),
        new Document("C", 1),
        new Document("C", 2)
    );

    public static void main(String args[]) {
        StreamEx<List<Document>> groups = StreamEx.of(docs).groupRuns((l, r) -> l.id.equals(r.id));
        for (List<Document> grp: groups.collect(toList())) {
            out.println(grp);
        }
    }
}

which outputs:

[Document{A1}, Document{A2}]
[Document{B1}]
[Document{C1}, Document{C2}]

I can't verify this doesn't consume the entire stream, but I cannot imagine why it would need to given what groupRuns is meant to do.

millimoose
  • 39,073
  • 9
  • 82
  • 134