11

I have a skewed data in a table which is then compared with other table that is small. I understood that salting works in case of joins- that is a random number is appended to keys in big table with skew data from a range of random data and the rows in small table with no skew data are duplicated with the same range of random numbers. Hence the matching happens because there will be a hit in one among the duplicate values for particular salted key of skewed able. I also read that salting is helpful while performing groupby. My question is when random numbers are appended to the key doesn't it break the group? If it does then the meaning of group by operation has changed.

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35
Bishamon Ten
  • 479
  • 1
  • 6
  • 20

3 Answers3

15

My question is when random numbers are appended to the key doesn't it break the group?

Well, it does, to mitigate this you could run group by operation twice. Firstly with salted key, then remove salting and group again. The second grouping will take partially aggregated data, thus significantly reduce skew impact.

E.g.

import org.apache.spark.sql.functions._

df.withColumn("salt", (rand * n).cast(IntegerType))
  .groupBy("salt", groupByFields)
  .agg(aggFields)
  .groupBy(groupByFields)
  .agg(aggFields)
Gelerion
  • 1,634
  • 10
  • 17
  • 1
    if the aggregation functions are like count, percentile and standardDeviation will this produce correct results, I know for sum, it will be efficient this way but not sure for the count, percentile, and standardDev it will provide the correct results. – Shubham Gupta May 17 '21 at 15:48
  • what if we try to create new column using substring on skewed column? – Sarang Oct 12 '21 at 04:26
3

"My question is when random numbers are appended to the key doesn't it break the group? If if does then the meaning of group by operation has changed."

Yes, adding a salt to an existing key will break the group. However, as @Gelerion has mentioned in his answer, you can group by the salted and original key and afterwards group by the original key. This works well for aggregations such as

  • count
  • min
  • max
  • sum

where it is possible to combine results from sub-groups. The illustration below shows an example of calculating the maximum value of a skewed Dataframe.

enter image description here

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
1
var df1 = Seq((1,"a"),(2,"b"),(1,"c"),(1,"x"),(1,"y"),(1,"g"),(1,"k"),(1,"u"),(1,"n")).toDF("ID","NAME") 

df1.createOrReplaceTempView("fact")

var df2 = Seq((1,10),(2,30),(3,40)).toDF("ID","SALARY")

df2.createOrReplaceTempView("dim")

val salted_df1 = spark.sql("""select concat(ID, '_', FLOOR(RAND(123456)*19)) as salted_key, NAME from fact """)

salted_df1.createOrReplaceTempView("salted_fact")

val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) as salted_key from dim""")

//val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0 to 19)) as salted_key from dim""")

exploded_dim_df.createOrReplaceTempView("salted_dim")

val result_df = spark.sql("""select split(fact.salted_key, '_')[0] as ID, dim.SALARY 
            from salted_fact fact 
            LEFT JOIN salted_dim dim 
            ON fact.salted_key = concat(dim.ID, '_', dim.salted_key) """)
display(result_df)
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Shiva
  • 31
  • 2