2

I have a PySpark dataframe which contains a column "student" as follows:

"student" : {
   "name" : "kaleem",
   "rollno" : "12"
}

Schema for this in dataframe is:

structType(List(
   name: String, 
   rollno: String))

I need to modify this column as

"student" : {
   "student_details" : {
         "name" : "kaleem",
         "rollno" : "12"
   }
}

Schema for this in dataframe must be:

structType(List(
  student_details: 
     structType(List(
         name: String, 
         rollno: String))
))

How to do this in Spark?

ZygD
  • 22,092
  • 39
  • 79
  • 102
mightyMouse
  • 658
  • 15
  • 23

3 Answers3

2

With a library called spark-hats - This library extends Spark DataFrame API with helpers for transforming fields inside nested structures and arrays of arbitrary levels of nesting., you can do a lot of these transformations.

scala> import za.co.absa.spark.hats.Extensions._

scala> df.printSchema
root
 |-- ID: string (nullable = true)

scala> val df2 = df.nestedMapColumn("ID", "ID", c => struct(c as alfa))

scala> df2.printSchema
root
 |-- ID: struct (nullable = false)
 |    |-- alfa: string (nullable = true)

scala> val df3 = df2.nestedMapColumn("ID.alfa", "ID.alfa", c => struct(c as "beta"))

scala> df3.printSchema
root
 |-- ID: struct (nullable = false)
 |    |-- alfa: struct (nullable = false)
 |    |    |-- beta: string (nullable = true)

Your query would be

df.nestedMapColumn("student", "student", c => struct(c as "student_details"))
  • this is scala. what about pyspark? – mightyMouse May 27 '20 at 19:24
  • 1
    Using scala libraries in pyspark is quite common. Look at this [SO post](https://stackoverflow.com/a/36024707/5594180) or other articles [1](https://diogoalexandrefranco.github.io/scala-code-in-pyspark/), [2](https://aseigneurin.github.io/2016/09/01/spark-calling-scala-code-from-pyspark.html) – Saša Zejnilović May 27 '20 at 19:30
2

Use named_struct function to achieve this

1. Read the json as column

val  data =
      """
        | {
        |   "student": {
        |       "name": "kaleem",
        |       "rollno": "12"
        |   }
        |}
      """.stripMargin
    val df = spark.read.json(Seq(data).toDS())
    df.show(false)
    println(df.schema("student"))

Output

+------------+
|student     |
+------------+
|[kaleem, 12]|
+------------+

StructField(student,StructType(StructField(name,StringType,true), StructField(rollno,StringType,true)),true)

2. change the schema using named_struct

val processedDf = df.withColumn("student",
      expr("named_struct('student_details', student)")
    )
    processedDf.show(false)
    println(processedDf.schema("student"))

Output

+--------------+
|student       |
+--------------+
|[[kaleem, 12]]|
+--------------+

StructField(student,StructType(StructField(student_details,StructType(StructField(name,StringType,true), StructField(rollno,StringType,true)),true)),false)

For python step#2 will work as is just remove val

ZygD
  • 22,092
  • 39
  • 79
  • 102
Som
  • 6,193
  • 1
  • 11
  • 22
0

Spark 3.1+

To modify struct type columns, we can use withField and dropFields

F.col("Student").withField("student_details", F.col("student"))
F.col("Student").dropFields("name", "rollno")

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame([(("kaleem", "12"),)], "student struct<name:string,rollno:string>")
df.printSchema()
# root
#  |-- student: struct (nullable = true)
#  |    |-- name: string (nullable = true)
#  |    |-- rollno: string (nullable = true)

Script:

df = df.withColumn("student", F.col("Student")
    .withField("student_details", F.col("student"))
    .dropFields("name", "rollno")
)

Result:

df.printSchema()
# root
#  |-- student: struct (nullable = true)
#  |    |-- student_details: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- rollno: string (nullable = true)
ZygD
  • 22,092
  • 39
  • 79
  • 102