0

I want to use the fact that my dataframes are already sorted by a key used for join.

df1.join(df2, df1.sorted_key == df2.sorted_key)

Both dataframes are large, BHJ or SHJ is not an option (SHJ crashes instead of spills)

How to hint Spark that the joined column is already sorted? I read from SO that hive+bucket+pre-sort helps. However I can't see where the dataframe store its sort status.

df = session.createDataFrame([
    ('Alice', 1),
    ('Bob', 2)
])
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

df = df.sort('_1')
df.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

^ Even when I manually sort on the column _1, the dataframe doesn't seem to remember it's sorted by _1.

Also,

  • How does Spark know the sorted status?

  • Does a parquet dataset (without hive metadata) remember which columns are sorted? Does Spark recognize it?

  • How does Hive + bucket + pre-sort help skip sort?

  • Can I use Hive + pre-sort without bucketing to skip sort?

  • I saw in the databricks talk Spark bucketing has many limitations and is different from Hive bucketing. Is Hive bucketing preferred?

  • The optimization talk by Databricks says never use bucketing because it is too hard to maintain in practice. Is it true?

  • Welcome to Stackoverflow! You are posing quite a bit of really good questions, great :) One thing you might not know about how we do things here is that we try to limit question posts to 1 question per post. You can read why [here](https://meta.stackexchange.com/questions/222735/can-i-ask-only-one-question-per-post) and [here](https://meta.stackexchange.com/questions/39223/one-post-with-multiple-questions-or-multiple-posts). Don't hesitate to edit your question to keep it focused on one question only! If you have more questions, just ask them as separate questions :) – Koedlt Feb 19 '23 at 06:27

1 Answers1

0

sort-merge-join skips shuffle when two dataframes use the same partitioner. There is no documented explanation about the concept of partitioner but here are cases to guarantee the same partitioner.

  1. bucketed table
  2. deltalake + z-order on 1 column
  3. manually call df.repartition('col_name') on both dataframes before join

Even with the same partitioner, it is unclear whether rows are sorted within each partition.