31

I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.

I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.

I have tried a few different settings for spark.sql.shuffle. I have tried 100, but it failed(I didn't really debug this to much to be honest). I tried 300, 4000, and 8000. Performance decreased with each increase. I am selecting a single day of data, where each file is an hour.

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

Since it seems partitioning by userId is not ideal, I could partition by the timestamp instead. If I do this, should I just do the Date + Hour? If I have less then 200 unique combos for this, will I have empty executors?

Dan Ciborowski - MSFT
  • 6,807
  • 10
  • 53
  • 88

1 Answers1

25

Spark >= 3.0

Since 3.0 Spark provides built-in optimizations for handling skewed joins - which can be enabled using spark.sql.adaptive.optimizeSkewedJoin.enabled property.

See SPARK-29544 for details.

Spark < 3.0

You clearly have a problem with a huge right data skew. Lets take a look a the statistics you've provided:

df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
df2 = [mean=1.0, stddev=0.0, count=18408194]

With mean around 5 and standard deviation over 2000 you get a long tail.

Since some keys are much more frequent than other after repartitioning some executors will have much more work to do than remaining ones.

Furthermoreb your description suggests that the problem can be with a single or a few keys which hash to the same partition.

So, let's first identify outliers (pseudocode):

val mean = 4.989209978967438 
val sd = 2255.654165352454

val df1 = sqlContext.sql("Select * from Table1")
val counts = df.groupBy("userId").count.cache

val frequent = counts
  .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
  .alias("frequent")
  .join(df1, Seq("userId"))

and the rest:

val infrequent = counts
  .where($"count" <= mean + 2 * sd)
  .alias("infrequent")
  .join(df1, Seq("userId"))

Is it really something to be expected? If not, try to identify the source of the issue upstream.

If it is expected, you can try:

  • broadcasting smaller table:

    val df2 = sqlContext.sql("Select * from Table2")
    df2.join(broadcast(df1), Seq("userId"), "rightouter")
    
  • splitting, unifying (union) and broadcasting only frequent:

    df2.join(broadcast(frequent), Seq("userId"), "rightouter")
      .union(df2.join(infrequent, Seq("userId"), "rightouter"))
    
  • salting userId with some random data

but you shouldn't:

  • repartition all data and sort locally (although sorting locally alone shouldn't be an issue)
  • perform standard hash joins on full data.
user10938362
  • 3,991
  • 2
  • 12
  • 29
zero323
  • 322,348
  • 103
  • 959
  • 935
  • My ratio of infrequentCount/frequentCount is 2.43. The tables I am joining are many gigs(about 24gigs for each table), so I do not believe that broadcasting is an option. I am going to try splitting and unifying. Can you provide an pseudo code for salting? I will clean up all the code and edit/post it here – Dan Ciborowski - MSFT Jul 24 '16 at 04:13
  • 4
    @zero323 I would love to see an example of "salting userId with some random data". I am trying to implement something like this but cannot fully visualize it. I understand you have to add a random number to the key but how does one then join that to the other table. Thanks! – John Engelhart Aug 15 '16 at 15:41
  • 2
    A possible solution is presented in this Spark Summit talk: https://www.youtube.com/watch?v=6zg7NTw-kTQ – Fokko Driesprong Nov 14 '17 at 09:14
  • @John Engelhart-- Let me know if you get an answer to this.. I was also trying to use the same technique but no luck. – vikrant rana Jun 17 '19 at 14:28
  • @JohnEngelhart Create another column with userId + random number in your data frame and use that column to repartition – Vivek Sethi Jul 31 '19 at 07:29
  • This is obviously too late to ask this question but @user10938362, what does that exactly provide? Is that the way of measuring the stddev and mean for a multikey groupBy? – Lenny D. Feb 05 '20 at 15:20
  • PR related to SPARK-29544 was not merged so the property `spark.sql.adaptive.optimizeSkewedJoin.enabled` won't work. Try using `spark.sql.adaptive.enabled=true` for spark >=3.0.x – Shrey Jakhmola Mar 14 '23 at 09:21