0

We are facing poor performance using Spark.

I have 2 specific questions:

  1. When debugging we noticed that a few of the groupby operations done on Rdd are taking more time
  2. Also a few of the stages are appearing twice, some finishing very quickly, some taking more time

Here is a screenshot of Spark UI when running locally.

Currently running locally, having shuffle partitions set to 2 and number of partitions set to 5, data is around 1,00,000 records.

Speaking of groupby operation, we are grouping a dataframe (which is a result of several joins) based on two columns, and then applying a function to get some result.

       val groupedRows = rows.rdd.groupBy(row => (
        row.getAs[Long](Column1),
        row.getAs[Int](Column2)
        ))
      val rdd = groupedRows.values.map(Criteria)

Where Criteria is some function acted on the grouped resultant rows. Can we optimize this group by in any way?

Here is a screenshot of the DAG Graph1.

niton
  • 8,771
  • 21
  • 32
  • 52
Thejas B
  • 69
  • 1
  • 2
  • 9

2 Answers2

0

I would suggest you not to convert the existing dataframe to rdd and do the complex process you are performing.

If you want to perform Criteria function on two columns (Column1 and Column2), you can do this directly on dataframe. Moreover, if your Criteria can be reduced to combination of inbuilt functions then it would be great. But you can always use udf functions for custom rules.

What I would suggest you to do is groupBy on the dataframe and apply aggregation functions

rows.groupBy("Column1", "Column2").agg(Criteria function)

You can use Window functions if you want multiple rows from the grouped dataframe. more info here

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • I dont think Spark 1.5.2 `agg` operation take in a function, it just takes in inbuilt `sum`, `avg` etc functions. Criteria is something we run each record for the group and select a record. After grouping we get `RDD[((Long, Int), Iterable[Row])]` and for each (key, key) we get a seq of rows, which we give it to Criteria – Thejas B Jun 24 '17 at 08:33
  • For that you should have a look at udaf aggregations. UDAF seems to be appropriate solution – Ramesh Maharjan Jun 24 '17 at 08:39
0

.groupBy is known to be not the most efficient approach:

Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.

Sometimes it is better to use .reduceByKey or .aggregateByKey, as explained here:

While both of these functions will produce the correct answer, the reduceByKey example works much better on a large dataset. That's because Spark knows it can combine output with a common key on each partition before shuffling the data.

Why .reduceByKey, .aggregateByKey work faster than .groupBy? Because part of the aggregation happens during map phase and less data is shuffled around worker nodes during reduce phase. Here is a good explanation on how does aggregateByKey work.

Nikolay Vasiliev
  • 5,656
  • 22
  • 31
  • Thanks, replaced the groupby with aggregateByKey, seems much better now. But I still see duplicate stages getting executed again. You have any idea about it? I searched a lot in google but can't seem to get the answer. – Thejas B Jun 25 '17 at 16:22
  • I would advise to check the DAG of the execution and the Event timeline. From the image you posted it's hard to say. If you define RDD operations in a sub-function and call it twice, it may happen that some RDD actions appear in the timeline twice for different data. – Nikolay Vasiliev Jun 25 '17 at 22:34
  • Hi Nikolay, I've added a part of DAG graph, since I can't upload more than 2 images. – Thejas B Jun 27 '17 at 05:23
  • Thanks. Looks like you are running Spark 2, since Tungsten engine is new to Spark 2 (https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html). I assume what you call "appear twice" is about jobs 8, 9 and 10. Since I don't see the code, I assume that either there is a loop that generates those stages (=> they appear several times) or Tungsten engine optimizes it for you (and generates code somehow). I don't think I can provide more help here :) Good luck and have fun. – Nikolay Vasiliev Jun 27 '17 at 21:23