4
    _logger.info("data size : "+saleData.size);

    saleData.parallelStream().forEach(data -> {
                SaleAggrData saleAggrData = new SaleAggrData() {
                    {
                        setCatId(data.getCatId());
                        setRevenue(RoundUpUtil.roundUpDouble(data.getRevenue()));
                        setMargin(RoundUpUtil.roundUpDouble(data.getMargin()));
                        setUnits(data.getUnits());
                        setMarginRate(ComputeUtil.marginRate(data.getRevenue(), data.getMargin()));
                        setOtd(ComputeUtil.OTD(data.getRevenue(), data.getUnits()));
                        setSaleDate(data.getSaleDate());
                        setDiscountDepth(ComputeUtil.discountDepth(data.getRegularPrice(), data.getRevenue()));
                        setTransactions(data.getTransactions());
                        setUpt(ComputeUtil.UPT(data.getUnits(), data.getTransactions()));
                    }
                };
                salesAggrData.addSaleAggrData(saleAggrData);
            });

The Issue with code is that when I am getting an response from DB, and while iterating using a parallel stream, the data size is different every time, while when using a sequential stream it's working fine.

I can't use a sequential Stream because the data is huge and it's taking time.

Any lead would be helpful.

Eran
  • 387,369
  • 54
  • 702
  • 768
Ravat Tailor
  • 1,193
  • 3
  • 20
  • 44
  • 1
    Be advised — you are using an anonymous subclass of the `SaleAggrData` class, which is unnecessary. This will have a performance impact, because an extra class must be loaded at runtime. You are better off using `SaleAggrData sad = new SaleAggrData(); sad.setCatId(...); sad.setRevenue(...); ...`. [See performance impact measurements here](https://stackoverflow.com/questions/924285/efficiency-of-java-double-brace-initialization). – MC Emperor Feb 21 '18 at 08:29
  • But data size is logged before stream execution... What are you taking about? – Jean-Baptiste Yunès Feb 22 '18 at 10:24
  • Is `salesAggrData` a thread safe collection? – Jean-Baptiste Yunès Feb 22 '18 at 10:25

1 Answers1

10

You are adding elements in parallel to salesAggrData which I'm assuming is some Collection. If it's not a thread-safe Collection, no wonder you get inconsistent results.

Instead of forEach, why don't you use map() and then collect the result into some Collection?

List<SaleAggrData> salesAggrData =
    saleData.parallelStream()
            .map(data -> {
                    SaleAggrData saleAggrData = new SaleAggrData() {
                        {
                            setCatId(data.getCatId());
                            setRevenue(RoundUpUtil.roundUpDouble(data.getRevenue()));
                            setMargin(RoundUpUtil.roundUpDouble(data.getMargin()));
                            setUnits(data.getUnits());
                            setMarginRate(ComputeUtil.marginRate(data.getRevenue(), data.getMargin()));
                            setOtd(ComputeUtil.OTD(data.getRevenue(), data.getUnits()));
                            setSaleDate(data.getSaleDate());
                            setDiscountDepth(ComputeUtil.discountDepth(data.getRegularPrice(), data.getRevenue()));
                            setTransactions(data.getTransactions());
                            setUpt(ComputeUtil.UPT(data.getUnits(), data.getTransactions()));
                        }
                    };
                    return saleAggrData;
            })
            .collect(Collectors.toList());

BTW, I'd probably change that anonymous class instance creation, and use a constructor of a named class to create the SaleAggrData instances.

Eran
  • 387,369
  • 54
  • 702
  • 768
  • 1
    And I'd probably use a sequential stream anyway, as a parallel stream will probably make things slower anyway, and most of the time is probably spent executing the query and loading the results, not transforming them. – JB Nizet Feb 21 '18 at 07:36
  • @JBNizet perhaps you are correct, though it wouldn't hurt testing the performance of a parallel stream compared to sequential. – Eran Feb 21 '18 at 07:39