You can use the from_json method to transform your json column to a structtype column. You can then break this column into different columns as is your case. But, you have to keep in mind that the json should have a uniform format otherwise the result may not be as desired.
You can refer to the following code:
val df = spark.createDataFrame(Seq(
("A", "B", "{\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\"}"),
("C", "D", "{\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\"}"),
("E", "F", "{\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"}")
)).toDF("col_1", "col_2", "col_json")
The input dataframe is as follows:
scala> df.show(false)
+-----+-----+---------------------------------------------+
|col_1|col_2|col_json |
+-----+-----+---------------------------------------------+
|A |B |{"Name":"xyz","Address":"NYC","title":"engg"}|
|C |D |{"Name":"mnp","Address":"MIC","title":"data"}|
|E |F |{"Name":"pqr","Address":"MNN","title":"bi"} |
+-----+-----+---------------------------------------------+
Now, we will find out the schema col_schema
for the json column so that it can be applied to the col_json
column
val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema
val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")
The result will be as follows:
scala> outputDF.show(false)
+-----+-----+-------+----+-----+
|col_1|col_2|Address|Name|title|
+-----+-----+-------+----+-----+
|A |B |NYC |xyz |engg |
|C |D |MIC |mnp |data |
|E |F |MNN |pqr |bi |
+-----+-----+-------+----+-----+
The scala code that worked for me is:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json}
import scala.collection.Seq
object Sample{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val df = spark.createDataFrame(Seq(
("A", "B", "{\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\"}"),
("C", "D", "{\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\"}"),
("E", "F", "{\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"}")
)).toDF("col_1", "col_2", "col_json")
import spark.implicits._
val col_schema = spark.read.json(df.select("col_json").as[String]).schema
val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")
outputDF.show(false)
}
}