0

I am running through the exercise in Databricks and the below code returns firstName in different order everytime I run. Please explain the reason why the order is not same for every run:

val peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")

id:integer
firstName:string
middleName:string
lastName:string
gender:string
birthDate:timestamp
ssn:string
salary:integer

/* Create a DataFrame called top10FemaleFirstNamesDF that contains the 10 most common female first names out of the people data set.*/
import org.apache.spark.sql.functions.count
val top10FemaleFirstNamesDF_1 = peopleDF.filter($"gender"=== "F").groupBy($"firstName").agg(count($"firstName").alias("cnt_firstName")).withColumn("cnt_firstName",$"cnt_firstName".cast("Int")).sort($"cnt_firstName".desc).limit(10)
val top10FemaleNamesDF = top10FemaleFirstNamesDF_1.orderBy($"firstName")

Some runs the assertion passes and in some run the assertion fails:

lazy val results = top10FemaleNamesDF.collect()

dbTest("DF-L2-names-0", Row("Alesha",    1368), results(0))  
// dbTest("DF-L2-names-1", Row("Alice",     1384), results(1))
// dbTest("DF-L2-names-2", Row("Bridgette", 1373), results(2))
// dbTest("DF-L2-names-3", Row("Cristen",   1375), results(3))
// dbTest("DF-L2-names-4", Row("Jacquelyn", 1381), results(4))
// dbTest("DF-L2-names-5", Row("Katherin",  1373), results(5))
// dbTest("DF-L2-names-5", Row("Lashell",   1387), results(6))
// dbTest("DF-L2-names-7", Row("Louie",     1382), results(7))
// dbTest("DF-L2-names-8", Row("Lucille",   1384), results(8))
// dbTest("DF-L2-names-9", Row("Sharyn",    1394), results(9)) 

println("Tests passed!")
Alon
  • 10,381
  • 23
  • 88
  • 152
Aavik
  • 967
  • 19
  • 48

1 Answers1

1

The problem might be the limit 10. Due to distributed nature of spark, you can't assume every time it runs the limit function it is going to give you same result. Spark might find different partition in different runs to give you 10 elements. If the underlying data is split across multiple partitions, then every time you evaluate it, limit might be pulling from a different partition.

However, I do realize you are sorting the data first and then limiting on that. The limit function supposed to return deterministically when the underlying rdd is sorted. It might be non-deterministic for unsorted data.

It will be worthwhile to see the physical plan of your query.

Avishek Bhattacharya
  • 6,534
  • 3
  • 34
  • 53
  • == Physical Plan == Sort [firstName#248 ASC NULLS FIRST], true, 0 +- TakeOrderedAndProject(limit=10, orderBy=[cnt_firstName#332 DESC NULLS LAST], output=[firstName#248,cnt_firstName#332]) +- *(2) HashAggregate(keys=[firstName#248], functions=[finalmerge_count(merge count#337L) AS count(firstName#248)#331L]) – Aavik Sep 12 '19 at 07:08
  • How do I need to read the TakeOrderedAndProject line, first it does limit and then orderBy? – Aavik Sep 12 '19 at 07:09
  • TakeOrderedAndProject is same as Sort and followed by limit. Here : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L149. I couldn't see anything wrong in the Physical plan. You might want to create an Issue in spark Jira. If you could create a small test case which can show this issue deterministically that would be fantastic, they would ask for that. – Avishek Bhattacharya Sep 25 '19 at 16:18