3

I have Java program that works with large dataset. The dataset stores in hdfs (csv).

The program works fine but it is very slow.

What the program do:

  1. load csv file
  2. separate line to String[]
  3. filter String array
  4. map to MyObject
  5. save MyObject to Cassandra

There is my main method:

public static void main(String[] args) {

        // configure spark
        SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
                .setMaster("local[*]")
                .set("spark.executor.memory", "4g");

        if (args.length > 1)
            sparkConf.set("spark.cassandra.connection.host", args[1]);

        // start a spark context
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // read text file to RDD
        JavaRDD<String> lines = sc.textFile(args[0]);

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .map(line -> line.split(","))
                .filter(someFilter)
                .map(MyObject::new);

        javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
    }

How can i improve perfomance?

Thank you for your answers.

Alex Ott
  • 80,552
  • 8
  • 87
  • 132

1 Answers1

2

Your code doesn't have shuffle issues(except when you have to write out to HDFS) and default partitioning is defined by input format, on Hadoop splits by HDFS cores and filter or map don't change partitioning. If you can filter first, you could see some improvement

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .filter(someFilter)
                .map(line -> line.split(","))
                .map(MyObject::new);

Spark can only run 1 concurrent task for every partition of an RDD, up to the number of cores in your cluster. So if you have a cluster with 50 cores, you want your RDDs to at least have 50 partitions. As far as choosing a "good" number of partitions, you generally want at least as many as the number of executors for parallelism. You can get this computed value by calling

sc.defaultParallelism

or inspect RDD Partitions number by

someRDD.partitions.size

When creating an RDD by reading a file using

rdd = SparkContext().textFile("hdfs://…/file.txt") 

the number of partitions may be smaller. Ideally, you would get the same number of blocks as you see in HDFS, but if the lines in your file are too long (longer than the block size), there will be fewer partitions.

Preferred way to set up the number of partitions for an RDD is to directly pass it as the second input parameter in the call like

rdd = sc.textFile("hdfs://… /file.txt", 400) 

where 400 is the number of partitions. In this case, the partitioning makes for 400 splits that would be done by the Hadoop’s TextInputFormat , not Spark and it would work much faster. It’s also that the code spawns 400 concurrent tasks to try to load file.txt directly into 400 partitions.

Repartition: increase partitions, rebalancing partitions after filter increase paralellism

        repartition(numPartitions: Int)

Coalesce: decrease partitions WITHOUT shuffle consolidate before outputting to HDFS/external

    coalesce(numPartitions: Int, suffle: Boolean = false)

And finally, and no less important, you can do some trials with different values and benchmark to see how many time is taking the process

  val start = System.nanoTime()

  // my process

  val end = System.nanoTime()

  val time = end - start
  println(s"My App takes: $time")

I Hope, it helps

Chema
  • 2,748
  • 2
  • 13
  • 24
  • Thank you! I will try it tomorrow. Can you tell me, how can i calculate numPartitions? There are 6 cores on machine where spark is executed and 6 hdfs partitions of input file (*_0.csv, *_1.csv, ..., *_5.csv). – Stepan Shcherbakov May 23 '20 at 10:25
  • rdd = sc.textFile("hdfs://… /file.txt", 400) and repartition helps me to improve MapShuffle stage. But, "save to cassandra" stage was very slow anyway. That is why i migrate my program to spark stream. It is 4-5 times faster than spark in my experience. – Stepan Shcherbakov May 28 '20 at 07:38
  • It depends of the your use case, if you can solve your problem with streaming great, but there are many cases that we have to deal with batch processing. – Chema May 28 '20 at 08:21