4

Not getting the same results every time when I run a parallel stream for reading a file and processing it.

I have data regarding pizzas and want to have a count of different variables using Map and global variables. I should use global variables only. But when I run my code, I am getting different results every time. The input file is not modified at all.

package Assignment;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import java.util.stream.Collectors;
import java.util.stream.Stream;

public class GlobalVariables {
    static int vegPizzas = 0;
    static int N_V_Pizzas = 0;
    static int Size_regular = 0;
    static int Size_medium = 0;
    static int Size_large = 0;
    static int Cheese_Burst = 0;
    static int Cheese_regular = 0;
    static int cheap_cheese = 0;

    static Stream<String> reader;

    static int rows = 0;

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            reader = Files.lines(Paths.get("data/SampleData.csv")).parallel();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        reader.map(x -> x.split(","))
            .filter(x -> (!(x[0].equals("NULL") || x[1].equals("NULL") || x[2].equals("NULL")     || x[3].equals("NULL"))))
            .map(x -> updateCounts(x)).collect(Collectors.toList());

        printResults();
        System.out.println(rows);//to have a count of rows after filtering of NULL values is     done
    }

    private static void printResults() {
        // TODO Auto-generated method stub
        System.out.println("veg pizzas: " + vegPizzas);
        System.out.println("non veg pizzas: " + N_V_Pizzas);
        System.out.println("regular: " + Size_regular + " medium: " + Size_medium + " large: "    +Size_large);
        System.out.println("cheese burst pizzas: " + Cheese_Burst);
        System.out.println("regular cheese burst: " + Cheese_regular);
        System.out.println("cheaper cheese burst: " + cheap_cheese);
    }

    private static Object updateCounts(String[] x) {
        // TODO Auto-generated method stub
//      static int vegPizzas = 0;
//      static int N_V_Pizzas = 0;
//      static int Size_regular = 0;
//      static int Size_medium = 0;
//      static int Size_large = 0;
//      static int Cheese_Burst = 0;
//      static int Cheese_regular = 0;
//      static int cheap_cheese = 0;
        rows++;
        int flag_regular = 0;
        if(x[9].equals("Y")) {
            vegPizzas++;
        }else if(x[9].equals("N")) {
            N_V_Pizzas++;
        }

        if(x[6].equals("R")) {
            Size_regular++;
            flag_regular = 1;
        }
        else if(x[6].equals("M")) {
            Size_medium++;
        }
        else if(x[6].equals("L")) {
            Size_large++;
        }

        if(x[5].equals("Y")) {
            Cheese_Burst++;
            if(flag_regular == 1) {
                Cheese_regular++;
            }
            if(Integer.parseInt(x[7]) < 500) {
                cheap_cheese++;
            }
        }

        return x;
    }

}

//True results or expected results (counts of each variety)

veg: 5303 non-veg: 1786 regular: 1779 medium: 2660 large: 2650 cheese-burst: 3499 regular cheese burst: 900 cheaper cheese burst: 598

//Run -1 results

veg pizzas: 5296 non veg pizzas: 1785 regular: 1779 medium: 2660 large: 2649 cheese burst pizzas: 3498 regular cheese burst: 900 cheaper cheese burst: 598 7060

//Run-2 results

veg pizzas: 5294 non veg pizzas: 1786 regular: 1779 medium: 2659 large: 2648 cheese burst pizzas: 3497 regular cheese burst: 900 cheaper cheese burst: 598 7055

//Run-3 results

veg pizzas: 5303 non veg pizzas: 1786 regular: 1779 medium: 2660 large: 2650 cheese burst pizzas: 3499 regular cheese burst: 900 cheaper cheese burst: 598 7086

I did go through this link. I could not relate my problem with the problem posted in that link. I did notice that if I create a sequential stream I am getting the expected results. Any lead on this can be helpful.

Stefan Zobel
  • 3,182
  • 7
  • 28
  • 38
HackChamp
  • 65
  • 10
  • I got the answer. I did not make the method thread safe as described in the earlier link posted at the end of the above question. Now I am getting the same expected results. I am new to this community, so can someone suggest me if I should remove this question or let it be? If i have remove it then to do so? Thank you – HackChamp Dec 27 '18 at 12:24

1 Answers1

6

Your updateCounts(String[] x) method, called by the map step of the Stream pipeline is not thread-safe, and it updates static variables.

So when it is called by multiple threads concurrently, it is expected to produce different results in each run (i.e. the final values of the static variables will be different in each run).

The Function passed to map shouldn't have side effects, especially not when used in a parallel Stream.

A better way to make this computation with Streams:

  • Create a PizzaStatistics class having all the original static counter variables as instance variables.

  • updateCounts (which should be renamed) would return a new PizzaStatistics instance where the relevant counters are set to 1. It won't update any static variables.

  • The Stream pipeline would use the terminal operation reduce to produce a single PizzaStatistics instance that contains the totals.

Eran
  • 387,369
  • 54
  • 702
  • 768
  • Isn't there a way to make only variables rather than methods being synchronized? So that if one variable is being edited by one thread, another thread can work on another variable in the same method. I can think of one methodology where every variable update is done in different methods and those methods are thread safe but no the updateCounts(String[ ] x) method – HackChamp Dec 27 '18 at 12:28
  • 2
    @HackChamp you could make the `updateCounts` method synchronized, but it would still be bad usage of Streams. – Eran Dec 27 '18 at 12:29
  • I have edited my earlier comment. Is that(mentioned in earlier comment) a better way of doing things? – HackChamp Dec 27 '18 at 12:34
  • 2
    @HackChamp you can change `updateCounts` to call a different synchronized static method for each variable it has to update. This way `updateCounts` itself won't have to be synchronized – Eran Dec 27 '18 at 12:37
  • 2
    @HackChamp it's still not good practice for using the Stream pipeline this way, since `map` shouldn't have side effects – Eran Dec 27 '18 at 12:38