11

From the Spark source code:

/**
   * Represents the content of the Dataset as an `RDD` of `T`.
   *
   * @group basic
   * @since 1.6.0
   */
  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972

The mapPartitions can take as long as the time to compute the RDD in the first place.. So this makes operations such as

df.rdd.getNumPartitions

very expensive. Given that a DataFrame is DataSet[Row] and a DataSet is composed of RDD's why is a re-mapping required? Any insights appreciated.

Community
  • 1
  • 1
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560

1 Answers1

10

TL;DR That's because the internal RDD is not RDD[Row].

Given that a DataFrame is DataSet[Row] and a DataSet is composed of RDD's

That's a huge oversimplification. First of all DataSet[T] doesn't mean that you interact with container of T. It means that if you use collection-like API (often referred as strongly typed), internal representation will be decoded into T.

The internal representation is a binary format used internally by Tungsten.This representation is internal and subject of changes and far too low level to be used in practice.

An intermediate representation, which exposes this data is the InternalRow - rddQueryExecution.toRDD is in fact RDD[InternalRow]. This representation (there are different implementation) still exposes the internal types, is consider "weakly" private, as all objects in o.a.s.sql.catalyst (the access is not explicitly restricted, but API is not documented), and rather tricky to interact with.

This where decoding comes into play and why you need full "re-mapping" - to convert internal, often unsafe, objects into external types intended for public usage.

Finally, to reiterate my previous statement - the code in question won't be executed when getNumPartitions is called.

user10938362
  • 3,991
  • 2
  • 12
  • 29
  • The above is interesting analysis of the unsafe internal representation so I upvoted for _that_.. You'll need to remove the statement about "it won't be executed" to get awarded because I have repeatedly run our workflows with and without the count and the latter more or less doubles the time. – WestCoastProjects Feb 03 '19 at 20:22
  • @javadba I suppose the answer referred to the lambda function, which does not execute on `getNumPartitions` – shay__ Jun 22 '20 at 12:47