9

I am trying to change the names of a DataFrame columns in scala. I am easily able to change the column names for direct fields but I'm facing difficulty while converting array struct columns.

Below is my DataFrame schema.

|-- _VkjLmnVop: string (nullable = true)
|-- _KaTasLop: string (nullable = true)
|-- AbcDef: struct (nullable = true)
 |    |-- UvwXyz: struct (nullable = true)
 |    |    |-- _MnoPqrstUv: string (nullable = true)
 |    |    |-- _ManDevyIxyz: string (nullable = true)

But I need the schema like below

|-- vkj_lmn_vop: string (nullable = true)
|-- ka_tas_lop: string (nullable = true)
|-- abc_def: struct (nullable = true)
 |    |-- uvw_xyz: struct (nullable = true)
 |    |    |-- mno_pqrst_uv: string (nullable = true)
 |    |    |-- man_devy_ixyz: string (nullable = true)

For Non Struct columns I'm changing column names by below

def aliasAllColumns(df: DataFrame): DataFrame = {
  df.select(df.columns.map { c =>
    df.col(c)
      .as(
        c.replaceAll("_", "")
          .replaceAll("([A-Z])", "_$1")
          .toLowerCase
          .replaceFirst("_", ""))
  }: _*)
}
aliasAllColumns(file_data_df).show(1)

How I can change Struct column names dynamically?

Leo C
  • 22,006
  • 3
  • 26
  • 39
Vijay
  • 924
  • 1
  • 12
  • 27

2 Answers2

13

You can create a recursive method to traverse the DataFrame schema for renaming the columns:

import org.apache.spark.sql.types._

def renameAllCols(schema: StructType, rename: String => String): StructType = {
  def recurRename(schema: StructType): Seq[StructField] = schema.fields.map{
      case StructField(name, dtype: StructType, nullable, meta) =>
        StructField(rename(name), StructType(recurRename(dtype)), nullable, meta)
      case StructField(name, dtype: ArrayType, nullable, meta) if dtype.elementType.isInstanceOf[StructType] =>
        StructField(rename(name), ArrayType(StructType(recurRename(dtype.elementType.asInstanceOf[StructType])), true), nullable, meta)
      case StructField(name, dtype, nullable, meta) =>
        StructField(rename(name), dtype, nullable, meta)
    }
  StructType(recurRename(schema))
}

Testing it with the following example:

import org.apache.spark.sql.functions._
import spark.implicits._

val renameFcn = (s: String) =>
  s.replace("_", "").replaceAll("([A-Z])", "_$1").toLowerCase.dropWhile(_ == '_')

case class C(A_Bc: Int, D_Ef: Int)

val df = Seq(
  (10, "a", C(1, 2), Seq(C(11, 12), C(13, 14)), Seq(101, 102)),
  (20, "b", C(3, 4), Seq(C(15, 16)), Seq(103))
).toDF("_VkjLmnVop", "_KaTasLop", "AbcDef", "ArrStruct", "ArrInt")

val newDF = spark.createDataFrame(df.rdd, renameAllCols(df.schema, renameFcn))

newDF.printSchema
// root
//  |-- vkj_lmn_vop: integer (nullable = false)
//  |-- ka_tas_lop: string (nullable = true)
//  |-- abc_def: struct (nullable = true)
//  |    |-- a_bc: integer (nullable = false)
//  |    |-- d_ef: integer (nullable = false)
//  |-- arr_struct: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- a_bc: integer (nullable = false)
//  |    |    |-- d_ef: integer (nullable = false)
//  |-- arr_int: array (nullable = true)
//  |    |-- element: integer (containsNull = false)
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Hi Leo , This what I'm looking for. This is really great stuff. Thanks a lot. I'm accepting this answer. – Vijay Mar 26 '19 at 17:47
  • I don't know how it worked. I've tried but it didn't change nested columns of schema only first level. – Dave Jan 12 '20 at 10:23
  • Oh I guess it doesn't work if a column is an array of structures like – Dave Jan 12 '20 at 11:27
  • 1
    @Dave, thanks for the feedback. You're right that it didn't cover cases of nested Array elements of StructType. I've revised the solution to take care of such cases. – Leo C Jan 12 '20 at 22:23
  • really great way to rename the schema. – William R Mar 03 '20 at 11:04
0

as far as I know, it's not possible to rename nested fields directly.

From one side, you could try moving to a flat object.

However, if you need to keep the structure, you can play with spark.sql.functions.struct(*cols).

Creates a new struct column.
Parameters: cols – list of column names (string) or list of Column expressions

You will need to decompose all the schema, generate the aliases that you need and then compose it again using the struct function.

It's not the best solution. But it's something :)

Pd: I'm attaching the PySpark doc since it contains a better explanation than the Scala one.

Franzi
  • 1,791
  • 23
  • 21
  • But I've 10 struct columns having 18 attributes in each struct? Is there any other better approach? – Vijay Mar 26 '19 at 17:16
  • I would recommend you to code a tail recursive function, that given a schema, it generates all the replace/struct methods. – Franzi Mar 26 '19 at 17:20