0

I have a Dataframe (df) that I want to use in a different SparkSession, in the same SparkContext.

I am creating a new dataframe with

val newDf = spark.createDataFrame(df.rdd, df.schema)

What I want to understand is the performance difference of a simple operation like df.count()

The original DF returns almost immediately.

The newly created DF takes much longer.

The original DF plan shows the reads against original Parquet files ("Scan Parquet" operator)

The new DF plan shows "Scan ExistingRDD", which makes sense; but I don't understand why it should be so different since the existing RDD is ultimately coming from the same Parquet.

What am I missing?

wrschneider
  • 17,913
  • 16
  • 96
  • 176

1 Answers1

0

To the DataFrame API, the RDD is a just an abstract set of rows. Spark can quickly calculate the rows count using Parquet metadata; however, in this case the DataFrame API is unaware of if, for example, a filter was applied to the set of rows stored in the Parquet files, so it has to do some iterate over the rows as well as serialise them to DataFrame API's row format instead of looking at the Parquet metadata.

 // A DataFrame instance with an associated SQL execution plan.
val df = spark.read.parquet("...")

val abstractRdd =
  .rdd // A RDD with no SQL execution plan.
  .filter(row => false) // A filter that not a part of DataFrame API's SQL execution plan.

val scheme = df.scheme

val df2 =
  spark.createDataFrame(
    // There is no execution plan available for the SQL engine to analyse and optimise,
    // for example, by extracting the rows count from Parquet metadata.
    abstractRdd,
    scheme
  )

Please see the Fast Parquet row count in Spark and Difference between DataFrame, Dataset, and RDD in Spark discussions for more information.

Leonid Vasilev
  • 11,910
  • 4
  • 36
  • 50