2

I have recently started using pyspark and I encountered some behavior that I am trying to better understand, and avoid.

Consider the following code:

query1 = "SELECT * FROM A where X >= 1000000 and X < 1001000 LIMIT 50"
s1 = spark.sql(query1)
X_vals = s1.select('X').rdd.flatMap(lambda x: x).collect()

query2 = "SELECT * FROM B" + " where Y in " + '('  + ','.join([str(x) for x in X_vals]) + ')'
s2 = spark.sql(query2)

s1.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/A.csv')
s2.write.mode('overwrite').option("header", True).option("sep",'\t').csv('test/B.csv')

From A, I obtain a sample of 50 records from a range, and store the values of X in X_vals. I then take the same records (where Y in X_vals) from table B.

Later, I write both tables to csv files. In the resulting csv files, the X's in A do not match the Y's in B anymore.

I think this is explainable behavior and is caused by lazy evaluation; the records selected in the collect() statement are not the same records as the ones in the .csv statement. However my understanding of Spark is not yet good enough to explain exactly why this happens.

So; why does this happen, and is there a way to force the query to return the same results twice (without joining the tables)?

Thanks,

Florian

Florian
  • 24,425
  • 4
  • 49
  • 80
  • 1
    How about using `order by` to force query1 to a specific order? – Bala Jan 22 '18 at 14:28
  • 1
    @Bala, I guess `ORDER BY could` work indeed, although I am not sure. My intuitive explanation for this behavior is that the above is stochastic because it depends on which executor is finished first. In the case of `order by` would it first collect 20 records and then order, or first order all the records and then collect? – Florian Jan 22 '18 at 14:39

1 Answers1

3

The problem is the implementation of LIMIT . It implemented by shuffling records to a single partition (you can find detailed explanation in the excellent answer to Towards limiting the big RDD).

At the same time, Spark follows SQL standard rules - if there is no explicit order, then optimizer can choose arbitrary records.

val df = spark.range(1000)

df.where($"id".between(100, 200)).limit(10).explain
== Physical Plan ==
CollectLimit 10
+- *LocalLimit 10
   +- *Filter ((id#16L >= 100) && (id#16L <= 200))
      +- *Range (0, 1000, step=1, splits=4)

To get deterministic (somewhat, AFAIK ties are resolved nondeterministically) order use orderBy clause, to convert CollectLimit into TakeOrderedAndProject:

df.where($"id".between(100, 200)).orderBy("id").limit(10).explain
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#16L ASC NULLS FIRST], output=[id#16L])
+- *Filter ((id#16L >= 100) && (id#16L <= 200))
   +- *Range (0, 1000, step=1, splits=4)
philantrovert
  • 9,904
  • 3
  • 37
  • 61
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115