2

I wrote a small Spark application which should measure the time that Spark needs to run an action on a partitioned RDD (combineByKey function to sum a value).

My problem is, that the first iteration seems to work correctly (calculated duration ~25 ms), but the next ones take much less time (~5 ms). It seems to me, that Spark persists the data without any request to do so!? Can I avoid that programmatically?

I have to know the duration that Spark needs to calculate a new RDD (without any caching / persisting of earlier iterations) --> I think the duration should always be about 20-25 ms!

To ensure the recalculation I moved the SparkContext generation into the for-loops, but this didn't bring any changes...

Thanks for your advices!

Here my code which seems to persist any data:

public static void main(String[] args) {

    switchOffLogging();

    // jetzt

    try {
        // Setup: Read out parameters & initialize SparkContext
        String path = args[0];
        SparkConf conf = new SparkConf(true);
        JavaSparkContext sc;

        // Create output file & writer
        System.out.println("\npar.\tCount\tinput.p\tcons.p\tTime");

        // The RDDs used for the benchmark
        JavaRDD<String> input = null;
        JavaPairRDD<Integer, String> pairRDD = null;
        JavaPairRDD<Integer, String> partitionedRDD = null;
        JavaPairRDD<Integer, Float> consumptionRDD = null;

        // Do the tasks iterative (10 times the same benchmark for testing)
        for (int i = 0; i < 10; i++) {
            boolean partitioning = true;
            int partitionsCount = 8;

            sc = new JavaSparkContext(conf);
            setS3credentials(sc, path);

            input = sc.textFile(path);
            pairRDD = mapToPair(input);

            partitionedRDD = partition(pairRDD, partitioning, partitionsCount);

            // Measure the duration
            long duration = System.currentTimeMillis();
            // Do the relevant function
            consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
            duration = System.currentTimeMillis() - duration;

            // So some action to invoke the calculation
            System.out.println(consumptionRDD.collect().size());

            // Print the results
            System.out.println("\n" + partitioning + "\t" + partitionsCount + "\t" + input.partitions().size() + "\t" + consumptionRDD.partitions().size() + "\t" + duration + " ms");

            input = null;
            pairRDD = null;
            partitionedRDD = null;
            consumptionRDD = null;

            sc.close();
            sc.stop();

        }
    } catch (Exception e) {
        e.printStackTrace();
        System.out.println(e.getMessage());
    }
}

Some helper functions (should not be the problem):

private static void switchOffLogging() {
    Logger.getLogger("org").setLevel(Level.OFF);
    Logger.getLogger("akka").setLevel(Level.OFF);
}

private static void setS3credentials(JavaSparkContext sc, String path) {
    if (path.startsWith("s3n://")) {
        Configuration hadoopConf = sc.hadoopConfiguration();
        hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
        hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
        hadoopConf.set("fs.s3n.awsAccessKeyId", "mycredentials");
        hadoopConf.set("fs.s3n.awsSecretAccessKey", "mycredentials");
    }
}

// Initial element
private static Function<String, Float> createCombiner = new Function<String, Float>() {
    public Float call(String dataSet) throws Exception {
        String[] data = dataSet.split(",");
        float value = Float.valueOf(data[2]);
        return value;
    }
};

// merging function for a new dataset
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() {
    public Float call(Float sumYet, String dataSet) throws Exception {
        String[] data = dataSet.split(",");
        float value = Float.valueOf(data[2]);
        sumYet += value;
        return sumYet;
    }
};

// function to sum the consumption
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() {
    public Float call(Float a, Float b) throws Exception {
        a += b;
        return a;
    }
};

private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) {
    if (partitioning) {
        return pairRDD.partitionBy(new HashPartitioner(partitionsCount));
    } else {
        return pairRDD;
    }
}

private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) {
    return input.mapToPair(new PairFunction<String, Integer, String>() {
        public Tuple2<Integer, String> call(String debsDataSet) throws Exception {
            String[] data = debsDataSet.split(",");
            int houseId = Integer.valueOf(data[6]);
            return new Tuple2<Integer, String>(houseId, debsDataSet);
        }
    });
}

And finally the output of the Spark console:

part.   Count   input.p cons.p  Time
true    8       6       8       20 ms
true    8       6       8       23 ms
true    8       6       8       7 ms        // Too less!!!
true    8       6       8       21 ms
true    8       6       8       13 ms
true    8       6       8       6 ms        // Too less!!!
true    8       6       8       5 ms        // Too less!!!
true    8       6       8       6 ms        // Too less!!!
true    8       6       8       4 ms        // Too less!!!
true    8       6       8       7 ms        // Too less!!!
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
D. Müller
  • 3,336
  • 4
  • 36
  • 84

2 Answers2

1

I found a solution for me now: I wrote a separate class which calls the spark-submit command on a new process. This can be done in a loop, so every benchmark is started in a new thread and sparkContext is also separated per process. So garbage collection is done and everything works fine!

String submitCommand = "/root/spark/bin/spark-submit " + submitParams + " --   class partitioning.PartitionExample /root/partitioning.jar " + javaFlags;
Process p = Runtime.getRuntime().exec(submitCommand);

BufferedReader reader;
String line;

System.out.println(p.waitFor());
reader = new BufferedReader(new InputStreamReader(p.getInputStream()));         
while ((line = reader.readLine())!= null) {
  System.out.println(line);
}
D. Müller
  • 3,336
  • 4
  • 36
  • 84
0

If the shuffle output is small enough, then the Spark shuffle files will write to the OS buffer cache as fsync is not explicitly called...this means that, as long as there is room, your data will remain in memory.

If a cold performance test is truly necessary then you can try something like this attempt to flush the disk, but that is going to slow down the in-between each test. Could you just spin the context up and down? That might solve your need.

Justin Pihony
  • 66,056
  • 18
  • 147
  • 180
  • Tried the linux commands from [link](http://superuser.com/a/319287), which says to run 1) "sudo sync" and 2) "echo 3 > sudo /proc/sys/vm/drop_caches" without success... I tried also the SparkConf.set method **SparkConf conf = new SparkConf(true).set("spark.files.useFetchCache", "false");** but also no effect... – D. Müller Jun 07 '15 at 12:17