2

I have a dataframe 'df' with the following schema:

root
 |-- batch_key: string (nullable = true)
 |-- company_id: integer (nullable = true)
 |-- users_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- first_name: string (nullable = true)
 |    |    |-- last_name: long (nullable = true)
 |    |    |-- total_amount: double (nullable = true)

The column users_info is an array containing multiple structs.

I would like to change the column names such that 'batch_key' becomes 'batchKey', 'users_info' becomes 'usersInfo', 'first_name' becomes 'firstName' and etc.

I started with this code:

df2 = df
regex = new Regex("_(.)")
for (col <- df.columns) {
      df2 = df2.withColumnRenamed(col, regex.replaceAllIn(col, { M => M.group(1).toUpperCase }))
    }

But this code will only change the names of columns batch_key, company_id and users_info since for (col <- df.columns) returns [batch_key, company_id, users_info].

The nested columns under users_info are not changed. How can I modify the above code such that I can access the nested columns and change their column names as well?

activelearner
  • 7,055
  • 20
  • 53
  • 94
  • Probably you can use the [`select`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@select(cols:org.apache.spark.sql.Column*):org.apache.spark.sql.DataFrame) method and do the same for the returned DataFrame? – laughedelic Nov 18 '16 at 02:29

1 Answers1

0

In words: Create a Seq consisting of the flattened schema. Then use org.apache.spark.sql.functions.col and create a Seq of col({old column name}).as({new column name}) where you can use your regex for the new column name. Then use select to select the all columns from df but using the new names by calling df.select({the seq of cols}).

In more details:

Using the solution given here Automatically and Elegantly flatten DataFrame in Spark SQL you can first flatten your schema and put it into a Seq

def fullFlattenSchema(schema: StructType): Seq[String] = {
  def helper(schema: StructType, prefix: String): Seq[String] = {
    val fullName: String => String = name => {if (prefix.isEmpty) name else s"$prefix.$name"}
    schema.fields.flatMap {
      case StructField(name, inner: StructType, _, _) => helper(inner, fullName(name))
      case StructField(name, _, _, _) => Seq(fullName(name))
    }
  }

  helper(schema, "")
}

Then you apply your Regex on the so obtained Seq

val renamed_columns = fullFlattenSchema(df.schema).map(c =>col(c).as(regex.replaceAllIn(c, { M => M.group(1).toUpperCase })));

Finally you select your dataframe using the renamed columns and check if the Schema is as you whish

val df2 = df.select(renamed_columns:_*);
df2.printSchema()
Yannick Widmer
  • 1,286
  • 3
  • 19
  • 30