6

I am using Spark sql dataframes to perform a groupby operation and then compute the mean and median of data for each group. The original amount of data is about 1 terabyte.

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(
        count("Error").as("Count"), 
        avg("Error").as("MeanError"), 
        callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"), 
        callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"), 
        callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).
    filter($"Count" > 1000)


df_result.orderBy(asc("MeanError")).limit(5000)
    .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv")

When I run that query, my job gets stuck and does not complete. How do I go about debugging the problem? Is there a key imbalance that causes the groupby() to get stuck?

stackoverflowuser2010
  • 38,621
  • 48
  • 169
  • 217

1 Answers1

0

There are lots of sensible suggestions already in the comments, but for what it's worth here are my thoughts:

1) Does df.count work? If not, your problem is before the code you've posted (as suggested in comments)

2) Look in the Spark UI (as suggested in comments) - do most tasks complete quickly with a few taking a long while/appearing stuck? If so, skew is likely to be your problem

3) You could potentially rewrite your query to first only find the 'count' per 'id'. Next filter your original df to contain only rows where the id appears more than 1000 times through a broadcasted (to avoid shuffle of df) inner join (if there aren't too many ids with more than 1000 occurrences). Then aggregate this smaller dataframe and calculate all your statistics. If the count aggregation works, the output should also show if there's any significant data skew!

4) Sometimes breaking the computation up into even smaller steps and writing and then immediately reading from disk has helped me get awkward jobs to complete in the past. Also can make debugging quicker if generating df is costly in the first instance.

5) Definitely worth upping spark.sql.shuffle.partitions (as suggested in comments); 2001 is a magic number in spark (What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?)

6) I would also try varying the amount of data, does it work if you use only use day of week = 1 (as suggested in comments)

7) Does the query run without the percentile_approx?

user2682459
  • 999
  • 2
  • 13
  • 24
  • (1) Yes, `df.count` on the original df works fine. (7) The query doesn't consistently complete if I remove the `percentile_approx` statement. In general, my original query completes 50% of the time, and the other 50% of the time, the sys admin comes yelling at me for holding up the cluster. – stackoverflowuser2010 Apr 09 '18 at 01:03
  • (5) Is there any "official" documentation on partitioning and partitioning best practices? Like documentation from Databricks or Cloudera? I've searched but could not find anything definitive. – stackoverflowuser2010 Apr 09 '18 at 01:07
  • I haven't been able to find any after a quick search. There seems to be vague consensus that if tasks take less than 100 ms then you have too many partitions. – user2682459 Apr 09 '18 at 09:32
  • If your original query completes 50% of the time, this suggests there is something that changes within your cluster? Do you submit with the same options each time? Is dynamic allocation enabled on the cluster? – user2682459 Apr 09 '18 at 09:33
  • I think the fact that jobs complete 50% of the time is due to running the query on different input data. Maybe sometimes there is too much data coming to one node during `groupby`? – stackoverflowuser2010 Apr 10 '18 at 18:37
  • I didn't realise you were changing the data between runs - in which case yes, there's likely something significant different about the data between the runs which succeed and the ones which don't. How many distinct ids are there in your data? Groupby, at least with count and avg only, shouldn't shuffle very much data at all to one node as a significant amount of the work can be done prior to the shuffle (from each partition, only the partition sum and count need to be transmitted for each id) – user2682459 Apr 11 '18 at 19:45
  • What is df here? Is there a lot of code which generates this? Unless df is read directly in from somewhere (e.g. a parquet file) I really think it would be worth trying df.filter($"DayOfWeek" <= 5).write.....(some_file) val dfFiltered = spark.read....(some_file) val aggregated = dfFiltered.groupBy("id").agg(count($"error")) aggregated.write.....(some_aggregated_file) If this doesn't work, then there's no hope of the rest of it working. If it does work, at least we have something to build up from.... – user2682459 Apr 11 '18 at 19:54
  • A co-worker strongly suggested partitioning is the problem, but I don't know how to proceed. Can you add any more information on that? You current answer is useful, but I (and others) would appreciate more information on partitioning if you have anything to share, such as rules-of-thumb or other best practices. – stackoverflowuser2010 Apr 12 '18 at 16:37
  • Well partitioning might be a problem. If you're trying to process 1TB of data with 200 partitions you might struggle as that would correspond to an average of 5GB per partition. There isn't much penalty to have lots of partitions, until you have so many that tasks take less than 100ms. You don't want large partitions, which is what tends to happen when you have skewed data. Why I suggested running just the count query is that we can find out if your data is massively skewed on one id. If so, there may be workarounds. Please try and run it. – user2682459 Apr 12 '18 at 18:45
  • spark.default.parallelism=2000 and spark.sql.shuffle.partitions=2001 are two conf options I generally use. I tend to process in the order of 100GB though so you could try more. Maybe try 10000 for both? – user2682459 Apr 12 '18 at 18:47
  • So did the count indicate data skew? – user2682459 May 12 '18 at 12:22