2

I'm playing a little bit with Java streams and i came up with a solution for a problem that i would like to share with you and see if my approach is correct.

I've dowloaded a dataset from https://catalog.data.gov/dataset/consumer-complaint-database which has 700k+ records of complains of customers. The information that i'm using is the following:

CompanyName ProductName

My objective is to get a result with:

The 10 companies with more occurrences in the dataset

The 10 products with more occurrences in the dataset

And get something like

Map<String, Map<String,Integer>>

Where, the key of the main map is the Company Name, and the Key in the secondary Map is the Product Name, and its value is the amount of times that the product has a complain in that company.

So the solution that i've done is the following:

@Test
public void joinGroupingsTest() throws URISyntaxException, IOException {
    String path = CsvReaderTest.class.getResource("/complains.csv").toURI().toString();
    complains = CsvReader.readFileStreamComplain(path.substring(path.indexOf('/')+1));

    Map<String, List<Complain>> byCompany = complains.parallelStream()
            .collect(Collectors.groupingBy(Complain::getCompany))
            .entrySet().stream()
            .sorted((f1, f2) -> Long.compare(f2.getValue().size(), f1.getValue().size()))
            .limit(10)
            .collect(Collectors.toMap(Entry::getKey, Entry::getValue));


    Map<String, List<Complain>> byProduct = complains.parallelStream()
            .collect(Collectors.groupingBy(Complain::getProduct))
            .entrySet().stream()
            .sorted((f1, f2) -> Long.compare(f2.getValue().size(), f1.getValue().size()))
            .limit(10)
            .collect(Collectors.toMap(Entry::getKey, Entry::getValue));

    Map<String, List<Complain>> map = complains.parallelStream()
            .filter((x) -> byCompany.get(x.getCompany()) != null
                && byProduct.get(x.getProduct()) != null)
            .collect(Collectors.groupingBy(Complain::getCompany));

    Map<String, Map<String, Long>> map2 = map.entrySet().parallelStream()
            .collect(Collectors.toMap(
                    e -> e.getKey(),
                    e -> e.getValue().stream()
                            .collect(Collectors.groupingBy(Complain::getProduct, Collectors.counting()))
            ));


   System.out.println(map2);


}

As you can see i have a couple of steps to achive this:

1) I get the 10 companies with more occurrences and the complains (records) associated

2) I get the 10 products with more occurrences and the complains (records) associated

3) I get a map with the company name as the key that is in the top 10 companies calculated before and the complains of the products that are also in the top 10 products

4) I do the transformation needed to get the map that i want.

Other than forking and separating the steps 1 and 2 in two different threads, is there any other consideration that i might have to improve the performance or even to use in a better ways the streams.

Thanks!

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
mlocu
  • 21
  • 2
  • What if the 10 products with more complaints in the dataset are not sold by any of the 10 companies that have more complaints? I think this is an error in your design of the solution. Besides, you are traversing the whole dataset 3 times, and this doesn't look very optimal, IMHO – fps May 18 '17 at 02:45
  • There is no harm in using streams for aggregation, however, using `parallelStream` may be an overhead. I use `stream` often in my APIs.Here is a related thread: http://stackoverflow.com/questions/20375176/should-i-always-use-a-parallel-stream-when-possible – Teena George May 18 '17 at 07:28
  • There is no reason for the last two stream operations to be separated, just pass the `groupingBy` collector of the last as second argument to the `groupingBy` collector of the previous. – Holger May 18 '17 at 09:39
  • @Holger you are right, thats a fix for sure, i didn't know that i can do that – mlocu May 18 '17 at 14:50

2 Answers2

3

In the first two operations, you are collection groups into Lists, just to sort by their size. This is an obvious waste of resource, as you could simply count the group elements while grouping, then sort by the count. Further, since the first two operations are identical, besides the grouping function, it’s worth removing the code duplication by creating a method for the task.

The other two stream operations can be done in one, by performing the collect operation for the groups right when the groups are collected.

public void joinGroupingsTest() throws URISyntaxException, IOException {
    String path = CsvReaderTest.class.getResource("/complains.csv").toURI().toString();
    complains = CsvReader.readFileStreamComplain(path.substring(path.indexOf('/')+1));

    Set<String> byCompany = getTopTen(complains, Complain::getCompany);
    Set<String> byProduct = getTopTen(complains, Complain::getProduct);

    Map<String, Map<String, Long>> map = complains.stream()
            .filter(x -> byCompany.contains(x.getCompany())
                      && byProduct.contains(x.getProduct()))
            .collect(Collectors.groupingBy(Complain::getCompany,
                Collectors.groupingBy(Complain::getProduct, Collectors.counting())));
   System.out.println(map);
}
static <T,V> Set<V> getTopTen(Collection<T> source, Function<T,V> criteria) {
    return source.stream()
            .collect(Collectors.groupingBy(criteria, Collectors.counting()))
            .entrySet().stream()
            .sorted(Map.Entry.comparingByValue())
            .limit(10)
            .map(Map.Entry::getKey)
            .collect(Collectors.toSet());
}

Note that the intersection of the two criteria likely is smaller than ten elements, it is even possible that it is empty. You might rethink the condition.

Further, you should always recheck whether the amount of data really is large enough to benefit from parallel processing. Also note that the getTopTen operation consists of two stream operations. Switching the first to parallel doesn’t change the nature of the second.

Holger
  • 285,553
  • 42
  • 434
  • 765
  • I had the felling that i was doing too much operations for this. – mlocu May 18 '17 at 14:09
  • Thank you very much for the answer i will read more about streams to try to think in this kind of approach. Do you know a good book dedicated 100% to this? – mlocu May 18 '17 at 14:10
0

Using java stream is good approch if you do not have to deal with performance. Java Stream or parallel stream are comparatively slow and can make your debugging worst if there is any exception in operation going on in stream. The benefit of stream is you have to write a few lines of code to solve complex aggregation problems or data structure changes. Here is link where you can understand how java stream is slow compare to legacy approach.

https://blog.codefx.org/java/stream-performance/

Satish Kumar
  • 110
  • 1
  • 11
  • Please have a look at http://blog.takipi.com/benchmark-how-java-8-lambdas-and-streams-can-make-your-code-5-times-slower/ – Christian May 18 '17 at 08:14