1

I have a spark dataframe df. I need to update Spark dataframe column names based on Map type key value pairs.

 df.show()

   | col1|col2 |col3|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|

 Map_value = (col1 -> id, col2 -> name, col3 -> year)

Need help. I am not sure how to proceed

Expected output:

   | id  | name|year|
   |  2  |  Ive|1989|
   |Tom  | null|1981|
   |  4  | John|1991|
  

3 Answers3

1

Given:

case class ColData(col1: String, col2: String, col3: Int)

defined at a top level:

    val sourceSeq = Seq(
      ColData("2", "Ive", 1989),
      ColData("Tom", null, 1981),
      ColData("4", "John", 1991),
    )

    import sparkSession.implicits._

    def mapFields[T](ds: Dataset[T], fieldNameMap: Map[String, String]): DataFrame = {
      // make sure the fields are present - note this is not a free operation
      val fieldNames = ds.schema.fieldNames.toSet
      val newNames = fieldNameMap.filterKeys(fieldNames).map{ 
        case (oldFieldName, newFieldName) => col(oldFieldName).as(newFieldName)
      }.toSeq
      
      ds.select(newNames: _*)
    }

    val newNames = mapFields(sourceSeq.toDS(), Map("col1" -> "id", "col2" -> "name", "col3" -> "year", "not a field" -> "field"))

    newNames.show()

yielding:

+---+----+----+
| id|name|year|
+---+----+----+
|  2| Ive|1989|
|Tom|null|1981|
|  4|John|1991|
+---+----+----+

Note:

The fieldNames check uses ds.schema, which can be very expensive so prefer to use known fields instead of .schema. Using withColumn or withColumn renamed over lots of fields can severely impact performance as not all the projections are actually removed in generated code, prefer to keep the number of projections low where possible.

Chris
  • 1,240
  • 7
  • 8
  • [evidence for withColumn woes](https://lansalo.wordpress.com/2018/05/13/spark-how-to-add-multiple-columns-in-dataframes-and-how-not-to/) – Chris Aug 14 '23 at 15:06
0

You can use withColumnRenamed to renanme a column.

so using pseudo-code, the code should be:

map_value.foreach((k,v) ->  df = df.withcolumnrenamed(k,v))

For each key/value in your map, in the dataframe, rename the column key, by the new name value.

maxime G
  • 1,660
  • 1
  • 10
  • 27
0

This another way of solution is fine:

import org.apache.spark.sql.functions.col
val mapValue = Map(col1 -> id, col2 -> name, col3 -> year)

val colsAll = df.columns
val dfTransform = df.select(colsAll.map(c => col(c).as(mapValue.getOrElse(c,c))): _*)

Select is another helpful way to rename columns, in this case, you rename all columns, and then with getOrElse you can change if the field exits.