-3

I have a Spark DataFrame and I want to create Map and store values as Map[String, Map[String, String]]. I am not getting idea to do it, any help would be appreciated.

Below is Input and Output Format :

Input :

    +-----------------+------------+---+--------------------------------+
    |relation         |obj_instance|obj|map_value                       |
    +-----------------+------------+---+--------------------------------+
    |Start~>HInfo~>Mnt|Mnt         |Mnt|[Model -> 2000, Version -> 1.0] |
    |Start~>HInfo~>Cbl|Cbl-3       |Cbl|[VSData -> XYZVN, Name -> Smart]|
    +-----------------+------------+---+--------------------------------+

Output :

    Map(relation -> Start~>HInfo~>Mnt, obj_instance -> Mnt, obj -> Mnt, Mnt -> Map(Model -> 2000, Version -> 1.0))
    Map(relation -> Start~>HInfo~>Cbl, obj_instance -> Cbl-3, obj -> Cbl, Cbl -> Map(VSData -> XYZVN, Name -> Smart))  

Code, I'm trying but not success :

   var resultMap: Map[Any, Any] = Map()
   groupedDataSet.foreach( r => {
     val key1 = "relation".toString
     val value1 = r(0).toString
     val key2 = "obj_instance".toString
     val value2 = r(1).toString
     val key3 = "obj".toString
     val value3 = r(2).toString
     val key4 = r(2).toString
     val value4 = r(3)

     resultMap += (key1 -> value1, key2 -> value2, key3 -> value3, key4 -> value4)
   })
     resultMap.foreach(println)

Please help.

Below is the Code to create Test DataFrame and Map Column

            import org.apache.spark.SparkConf
            import org.apache.spark.sql.{Column, SparkSession}
            import org.apache.spark.sql.functions._

            object DFToMap extends App {

              //Creating SparkSession
              lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
                .setIfMissing("spark.master", "local[*]")
              lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()

              import sparkSession.implicits._

    // Creating raw DataFrame
          val rawTestDF = Seq(("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "VSData", "XYZVN"), ("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "Name", "Smart"),
            ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Model", "2000"), ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Version", "1.0"))
            .toDF("relation", "obj_instance", "obj", "key", "value")

          rawTestDF.show(false)

    val joinTheMap = udf { json_value: Seq[Map[String, String]] => json_value.flatten.toMap }

          val groupedDataSet = rawTestDF.groupBy("relation", "obj_instance", "obj").agg(collect_list(map(col("key"), col("value"))) as "map_value_temp").withColumn("map_value", joinTheMap(col("map_value_temp")))
            .drop("map_value_temp")

          groupedDataSet.show(false)  //This is the Input DataFrame.


            }

Final Output Json from Map :

    [{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}
    {"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj:"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}]

Note : I don't want to use any Spark groupBy, pivot, agg as Spark streaming doesn't support multi aggregation. Hence I want to get it with pure Scala code. Kindly help.

Anu S
  • 21
  • 6
  • Can you post your dataframe schema & create dataframe if you have already ? – Srinivas Apr 26 '20 at 10:58
  • Hi Srinivas, I have the code/class created to get this Output, Please help if you can to convert entitre Data Frame to Map and finally have to convert as Json. – Anu S Apr 26 '20 at 11:15
  • sure, i can help you if you give me final json output & input of dataframe. – Srinivas Apr 26 '20 at 11:18
  • can you explain this - India -> Map(city -> Delhi, size -> L) how are you mapping – Srinivas Apr 26 '20 at 11:20
  • also post your final json output here ? – Srinivas Apr 26 '20 at 11:23
  • Hi Srinaivas, I have updated input and output format in question. Same output I can get using Spark functions but I have to avoid due to Streaming restrictions, So want to get same output using pure Scala code using Map collection. If possible please help :( – Anu S Apr 26 '20 at 11:59
  • Can i know what are streaming restrictions ? – Srinivas Apr 26 '20 at 12:22

1 Answers1

0

Created custom UDF to parse & generate data in JSON format.

  import org.json4s.native.JsonMethods._
  import org.json4s._
  import org.json4s.JsonDSL._

  def toJson(relation:String,obj_instance: String,obj: String,map_value: Map[String,String]) = {
    compact(render(
      JObject("relation" -> JString(relation),
        "obj_instance" -> JString(obj_instance),
        "obj" -> JString(obj),
        obj -> map_value)))
  }

  import org.apache.spark.sql.functions._
  val createJson = udf(toJson _)
  val df = Seq(("Start~>HInfo~>Mnt","Mnt","Mnt",Map("Model" -> "2000", "Version" -> "1.0")),("Start~>HInfo~>Cbl","Cbl-3","Cbl",Map("VSData" -> "XYZVN", "Name" -> "Smart"))).toDF("relation","obj_instance","obj","map_value")
  df.select(createJson($"relation",$"obj_instance",$"obj",$"map_value").as("json_map")).show(false)


+-----------------------------------------------------------------------------------------------------------+
|json_map                                                                                                   |
+-----------------------------------------------------------------------------------------------------------+
|{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}   |
|{"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj":"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}|
+-----------------------------------------------------------------------------------------------------------+

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Hi Srinivas, Thanks for your prompt answer, but if you look into updated Question, My Input DataFrame has one Map column, hence this may not help as it needs only same datatype. – Anu S Apr 26 '20 at 12:42
  • Cast that map datatype column to string and pass to map spark function. – Srinivas Apr 26 '20 at 12:50
  • Same I have done in the answer where country India has map of values. – Srinivas Apr 26 '20 at 12:51
  • Earlier I had tried to cast Map Column as String and was getting the expected output but while I tried to convert to Json, it was not as expected as given below, hence Struggling for the solution. {"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":"[Model -> 2000, Version -> 1.0]} – Anu S Apr 26 '20 at 13:16
  • How are you converting map to json ? – Srinivas Apr 26 '20 at 14:21
  • Please check now, I have update answer & convert map to json using spark toJSON method. – Srinivas Apr 26 '20 at 14:40
  • Thanks Srinivas, but inner map is not converting to json, it's still looks map in json output :) – Anu S Apr 26 '20 at 15:01
  • one last try ..:) .. let me know if it is not working. – Srinivas Apr 26 '20 at 15:44
  • I had tried this as well.... but pattern \ in inner json is not acceptable in project and can't use regex replace due to performance issue with dataset in TB. – Anu S Apr 26 '20 at 15:55
  • This was the reason, I was looking for pure Scala collection based solution :) – Anu S Apr 26 '20 at 15:58
  • I have created UDF to generate JSON data from given columns & update answer. – Srinivas Apr 26 '20 at 18:08
  • take mail from my profile page. Thank You for accepting this answer. – Srinivas Apr 26 '20 at 19:44
  • Hi @Srinivas, Can you please help me with this question? I'm stuck at last stage of project. https://stackoverflow.com/questions/61457624/how-to-avoid-using-of-collect-in-spark-rdd-in-scala – Anu S Apr 27 '20 at 11:16