0

I have a stream of orderlines

Stream<String> lines=Stream.of("100-1","100-2","100-3",
                               "120-1",
                               "333-1","333-2"); // .. millions of lines

Now I want to chunk / map / group these into a stream like

Stream<List<String>> orders=Stream.of(
               Arrays.asList("100-1","100-2","100-3"),
               Arrays.asList("120-1"),
               Arrays.asList("333-1","333-2")); // .. thousands of orders

Now I can process each element in the stream as an order. I want to read each element in the 'order' stream as unit for the order

How can I stream from lines --> orders?

  • I don't want to collect the entire 'lines' stream before seeing 'orders'
  • I don't want to see a Map
  • I don't care that order "100" might come again a 100.000 elements later .. because that is not happening

--

  • I have read/tried a lot of Grouping, and super interesting articles on "foo" that and that.
  • I need to get it right.
  • In the sample I show OrderLine as string "100-2" to keep it simple .. It is real objects with an order number field, an order line field etc with getters.
Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
anders
  • 175
  • 2
  • 7
  • so you want to group by prefix 100-, 120- ? – QuickSilver Jun 03 '20 at 18:53
  • *I don't want to collect the entire 'lines' stream before seeing 'orders'* sounds like you need a database so you can select orders. Streams are use for collecting and manipulating information in preparation to process or review. it. – WJS Jun 03 '20 at 19:21
  • 1
    [Grouping Java8 stream without collecting it](https://stackoverflow.com/q/39013437/2711488) • [Java split stream by predicate into stream of streams](https://stackoverflow.com/q/49524105/2711488) – Holger Jun 04 '20 at 07:25

3 Answers3

1

How about something like this. Each map value is a list. The key is the value to the left of the dash -.

Map<String, List<String>> orders = Stream
       .of("100-1", "100-2", "100-3", "120-1", "333-1",
               "333-2")
       .collect(Collectors.groupingBy(order -> order
               .substring(0, order.indexOf("-"))));

orders.entrySet().forEach(System.out::println);

Prints

100=[100-1, 100-2, 100-3]
333=[333-1, 333-2]
120=[120-1]

So you could do this.

List<String> order100 = orders.get("100");
System.out.println(order100);

Prints

[100-1, 100-2, 100-3]

If you don't want to deal with a map but just want lists of lists of orders you can get this from the Map.

List<List<String>> lists = 
         orders.values().stream().collect(Collectors.toList());
WJS
  • 36,363
  • 4
  • 24
  • 39
  • The problem about producing a MAP is that entire stream "line" is processed to the end before stream "order" is streamed. That means that the solution is memory heavy - and NOT streaming. – anders Jun 03 '20 at 22:14
  • If you are going to keep them in data structures it indeed may be memory heavy. Processing the data as it is read from a file or similar approach could help. Also using a more conventional approach with loops and intermittently writing the lists out to separate files which are named to reflect their contents might work, – WJS Jun 03 '20 at 22:47
0

You can create a map out of your input and then collect only the map values

import scala.Tuple2;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class WordSpliter {

    public static void main(String[] args) {
        Stream<String> lines= Stream.of("100-1","100-2","100-3",
                "120-1",
                "333-1","333-2");

        Stream<List<String>> map = lines.map((String str) ->
                new Tuple2<>(str.substring(0, str.indexOf("-")), str)
        ).collect(Collectors.groupingBy(Tuple2::_1, Collectors.mapping(Tuple2::_2, Collectors.toList())))
                .values().stream();

        map.forEach(System.out::println);


    }
}

QuickSilver
  • 3,915
  • 2
  • 13
  • 29
0

I found a working answer by using two different solutions The criteria are

  1. I don't want to collect the entire 'lines' stream before seeing 'orders'
  2. I don't want to see a Map
  3. I use the fact that lines stream is sorted by order (from the database)

The first answer is that it is a FLATMAP problem. It is possible to make a flatmap function that groups all 'lines' belonging to same 'order' -- BINGO ..

But, Flatmap function got a problem - last 'line' will never turn into a group because the stream will be closed before flatmap function gets a chance to send the last element. This can be worked around by adding an EOS line using stream concat.

But but but it is difficult to explain for other developers. The solution to missing the last element is much better solved by changing the way the stream reads next element and ask question does next element exist.

The solution must change the Stream so we can get the last element handled as well.

My final solution is to make own Stream and manage the Iterator

The test looks like this

  public void grouping() {

    Stream<String> cartesian = Stream.of("100-1", "100-3", "100-4",
        "120-1", "120-2",
        "133-1"); // ... millions of elements

    Comparator<String> comp = new Comparator<String>() {
      @Override
      public int compare(String o1, String o2) {
        return extractKey(o1).compareTo(extractKey(o2));
      }

      private String extractKey(String element) {
        String str = element;
        return str.substring(0, str.indexOf('-'));
      }

    };
    final Stream<List<String>> grouped= GroupStream
        .of(cartesian)
        .chunkToStreamOfListByComparator(comp);


    grouped.forEach(row -> System.out.println("Grouped " + row));

  }

Output:

  • Grouped [100-1, 100-3, 100-4]
  • Grouped [120-1, 120-2]
  • Grouped [133-1]

I have made a class called GroupStream. This class makes a new stream (where elements are grouped in a list). A source stream is read inside the class.

The heavy parts are:

  • make iterator's hasNext() work
  • make the iterator's next() work
  • understanding how to create new stream.

The Java class looks like this

package dk.otc.demo;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
 * @param <ELEMENT> The instance type in Stream.
 */
public class GroupStream<ELEMENT> {

  private final Iterator<ELEMENT> sourceIterator;

  /**
   * @param source a stream of elements that contains identical information to
   *               group the elements into a list with this identical information.
   *
   *               A precondition is that the stream is sorted
   */
  public GroupStream(Stream<ELEMENT> source) {
    sourceIterator = source.iterator();
  }

  /**
   * @param comparator function that defines how to compare ELEMENT in
   *                   order to get knowledge of when to continue adding a source ELEMENT to
   *                   the current group and when to ship away the current group and start
   *                   building an new group.
   * @return a stream with the grouped elements
   */
  public Stream<List<ELEMENT>> chunkToStreamOfListByComparator(Comparator<? super ELEMENT> comparator) {

    final Iterator<List<ELEMENT>> chunkIterator = new Iterator<List<ELEMENT>>() {

      /**
       * Makes the iterator {@link #hasNext()} return a consistent value
       */
      Boolean consistentHasNext = null;
      List<ELEMENT> chunkUnderConstruction = initChunk();
      ELEMENT firstInNext = null;

      @Override
      public boolean hasNext() {
        if (consistentHasNext != null)
          return consistentHasNext;
        boolean more = sourceIterator.hasNext();
        if (!more && chunkUnderConstruction.isEmpty()) {
          return false;
        }
        boolean same = more;
        while (same && more) {
          ELEMENT value = sourceIterator.next();
          same = same(value);
          if (same) {
            add(value);
          } else {
            firstInNext = value;
          }
          more = sourceIterator.hasNext();
        }
        consistentHasNext = (!chunkUnderConstruction.isEmpty()) || firstInNext != null;
        return consistentHasNext;
      }

      @Override
      public List<ELEMENT> next() {
        try {
          consistentHasNext = null;
          return chunkUnderConstruction;
        } finally {
          chunkUnderConstruction = initChunk();
          if (firstInNext != null) {
            add(firstInNext);
            firstInNext = null;
          }
        }
      }

      private List<ELEMENT> initChunk() {
        return new ArrayList<>();
      }

      private void add(ELEMENT value) {
        chunkUnderConstruction.add(value);
      }

      boolean same(ELEMENT element) {
        final boolean res;
        if (chunkUnderConstruction.isEmpty()) {
          res = true;
        } else {
          res = comparator.compare(chunkUnderConstruction.get(0), element) == 0;
        }
        return res;
      }
    };
    final Spliterator<List<ELEMENT>> split = Spliterators.spliterator(chunkIterator, -1,
        (Spliterator.ORDERED      // A collection like List (Not Map). FIFO is ordered
            | Spliterator.NONNULL // No null will arrive
            | Spliterator.IMMUTABLE // Stream is not mutated during execution of stream
        ));
    return StreamSupport.stream(split, false);
  }

  public static <ELEMENT> GroupStream<ELEMENT> of(Stream<ELEMENT> source) {
    return new GroupStream<>(source);
  }
}
anders
  • 175
  • 2
  • 7