1

My question is if there are any approaches to update the schema of a DataFrame without explicitly calling SparkSession.createDataFrame(dataframe.rdd, newSchema).

Details are as follows.

I have an original Spark DataFrame with schema below:

root
 |-- column11: string (nullable = true)
 |-- column12: string (nullable = true)
 |-- column13: string (nullable = true)
 |-- column14: string (nullable = true)
 |-- column15: string (nullable = true)
 |-- column16: string (nullable = true)
 |-- column17: string (nullable = true)
 |-- column18: string (nullable = true)
 |-- column19: string (nullable = true)

I applied Dataset.mapPartitions on the original DataFrame and got a new DataFrame (returned by Dataset.mapPartitions). The reason for using Dataset.mapPartitions but not Dataset.map is better transformation speed.

In this new DataFrame, every row should have a schema like below:

root
 |-- column21: string (nullable = true)
 |-- column22: long (nullable = true)
 |-- column23: string (nullable = true)
 |-- column24: long (nullable = true)
 |-- column25: struct (nullable = true)
 |    |-- column251: string (nullable = true)
 |    |-- column252: string (nullable = true)
 |    |-- column253: string (nullable = true)
 |    |-- column254: string (nullable = true)
 |    |-- column255: string (nullable = true)
 |    |-- column256: string (nullable = true)

So the schema of the new DataFrame should be the same as the above.

However, the schema of the new DataFrame won't be updated automatically. The output of applying Dataset.printSchema method on the new DataFrame is still original:

root
 |-- column11: string (nullable = true)
 |-- column12: string (nullable = true)
 |-- column13: string (nullable = true)
 |-- column14: string (nullable = true)
 |-- column15: string (nullable = true)
 |-- column16: string (nullable = true)
 |-- column17: string (nullable = true)
 |-- column18: string (nullable = true)
 |-- column19: string (nullable = true)

So, in order to get the correct (updated) schema, what I'm doing is using SparkSession.createDataFrame(newDataFrame.rdd, newSchema). My concern here is that falling back to RDD (newDataFrame.rdd) will hurt the transformation speed because Spark Catalyst doesn't handle RDD as well as Dataset/DataFrame.

My question is if there are any approaches to update the schema of the new DataFrame without explicitly calling SparkSession.createDataFrame(newDataFrame.rdd, newSchema).

Thanks a lot.

microwish
  • 21
  • 1
  • 4
  • 1
    You seem to have some code which "doesn't work". Unless you share your code (a.k.a [mcve] or [reproducible example](https://stackoverflow.com/q/48427185)) it is almost impossible to diagnose and fix it. – zero323 Jul 19 '18 at 22:20

1 Answers1

0

You can use RowEncoder to define schema for newDataFrame. See following example.

val originalDF = spark.sparkContext.parallelize(List(("Tonny", "city1"), ("Rogger", "city2"), ("Michal", "city3"))).toDF("name", "city")
val r = scala.util.Random
val encoderForNewDF = RowEncoder(StructType(Array(
  StructField("name", StringType),
  StructField("num", IntegerType),
  StructField("city", StringType)
)))
val newDF = originalDF.mapPartitions { partition =>
  partition.map{ row =>
    val name = row.getAs[String]("name")
    val city = row.getAs[String]("city")
    val num = r.nextInt
    Row.fromSeq(Array[Any](name, num, city))
  }
} (encoderForNewDF)

newDF.printSchema()  
|-- name: string (nullable = true)  
|-- num: integer (nullable = true)  
|-- city: string (nullable = true)  

Row Encoder for spark: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-RowEncoder.html

Nirav Kuntar
  • 101
  • 2