I have a Spark DataFrame and I want to create Map and store values as Map[String, Map[String, String]]. I am not getting idea to do it, any help would be appreciated.
Below is Input and Output Format :
Input :
+-----------------+------------+---+--------------------------------+
|relation |obj_instance|obj|map_value |
+-----------------+------------+---+--------------------------------+
|Start~>HInfo~>Mnt|Mnt |Mnt|[Model -> 2000, Version -> 1.0] |
|Start~>HInfo~>Cbl|Cbl-3 |Cbl|[VSData -> XYZVN, Name -> Smart]|
+-----------------+------------+---+--------------------------------+
Output :
Map(relation -> Start~>HInfo~>Mnt, obj_instance -> Mnt, obj -> Mnt, Mnt -> Map(Model -> 2000, Version -> 1.0))
Map(relation -> Start~>HInfo~>Cbl, obj_instance -> Cbl-3, obj -> Cbl, Cbl -> Map(VSData -> XYZVN, Name -> Smart))
Code, I'm trying but not success :
var resultMap: Map[Any, Any] = Map()
groupedDataSet.foreach( r => {
val key1 = "relation".toString
val value1 = r(0).toString
val key2 = "obj_instance".toString
val value2 = r(1).toString
val key3 = "obj".toString
val value3 = r(2).toString
val key4 = r(2).toString
val value4 = r(3)
resultMap += (key1 -> value1, key2 -> value2, key3 -> value3, key4 -> value4)
})
resultMap.foreach(println)
Please help.
Below is the Code to create Test DataFrame and Map Column
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawTestDF = Seq(("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "VSData", "XYZVN"), ("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "Name", "Smart"),
("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Model", "2000"), ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Version", "1.0"))
.toDF("relation", "obj_instance", "obj", "key", "value")
rawTestDF.show(false)
val joinTheMap = udf { json_value: Seq[Map[String, String]] => json_value.flatten.toMap }
val groupedDataSet = rawTestDF.groupBy("relation", "obj_instance", "obj").agg(collect_list(map(col("key"), col("value"))) as "map_value_temp").withColumn("map_value", joinTheMap(col("map_value_temp")))
.drop("map_value_temp")
groupedDataSet.show(false) //This is the Input DataFrame.
}
Final Output Json from Map :
[{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}
{"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj:"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}]
Note : I don't want to use any Spark groupBy, pivot, agg as Spark streaming doesn't support multi aggregation. Hence I want to get it with pure Scala code. Kindly help.