3

I'm trying to fetch the number of partitions of a dataframe using this:

df.rdd.getNumPartitions.toString

But when I monitor the spark log, I see it spins up many stages and is a costly operation to have. enter image description here

As per my understanding, dataframe adds a structural layer to rdd via metadata. So, how come stripping that while converting to rdd takes this much time?

Pritam Pathak
  • 159
  • 1
  • 1
  • 8
  • 1
    Does this answer your question? [Why does the Spark DataFrame conversion to RDD require a full re-mapping?](https://stackoverflow.com/questions/54269477/why-does-the-spark-dataframe-conversion-to-rdd-require-a-full-re-mapping). Also [How to check the number of partitions of a Spark DataFrame without incurring the cost of .rdd](https://stackoverflow.com/q/54268845/10938362) – user10938362 May 20 '20 at 15:39

1 Answers1

7

A DataFrame is an optimized distributed tabular collection. Since it keeps a tabular format (similar to a SQL table) it can mantain metadata to allow Spark some optimizations performed under the hood.

This optimizations are performed by side project such as Catalyst and Tungsten

RDD does not mantain any schema, it is required for you to provide one if needed. So RDD is not as highly oiptimized as Dataframe, (Catalyst is not involved at all)

Converting a DataFrame to an RDD force Spark to loop over all the elements converting them from the highly optimized Catalyst space to the scala one.

Check the code from .rdd

  lazy val rdd: RDD[T] = {
    val objectType = exprEnc.deserializer.dataType
    rddQueryExecution.toRdd.mapPartitions { rows =>
      rows.map(_.get(0, objectType).asInstanceOf[T])
    }
  }

@transient private lazy val rddQueryExecution: QueryExecution = {
    val deserialized = CatalystSerde.deserialize[T](logicalPlan)
    sparkSession.sessionState.executePlan(deserialized)
  }

So first, it's executing the plan and retrieve the output as an RDD[InternalRow] which, as the name implies, are only for internal use and need to be converted to RDD[Row]

Then it loops over all the rows converting them. As you can see, it's not just removing the schema

Hope that answer your question.

SCouto
  • 7,808
  • 5
  • 32
  • 49
  • Aah okay. So, you mean to say this time is proportional to the bulkiness of the dataframe? Also, is there a way to avoid this cost if I want to still know the number of partitions of the dataframe? – Pritam Pathak May 20 '20 at 13:59
  • you can try to cache the dataframe first, so you have it computed before turning it to RDD. I think that would help at least a little – SCouto May 20 '20 at 14:03