0

Input:

id1   id2    name   value           epid
"xxx" "yyy"  "EAN"  "5057723043"    "1299"
"xxx" "yyy"  "MPN"  "EVBD"          "1299"

I want:

{         "id1": "xxx",
          "id2": "yyy",
          "item_specifics": [
            {
              "name": "EAN",
              "value": "5057723043"
            },
            {
              "name": "MPN",
              "value": "EVBD"
            },
            {
              "name": "EPID",
              "value": "1299"
            }
          ]
}

I tried the following two solutions from How to aggregate columns into json array? and how to merge rows into column of spark dataframe as vaild json to write it in mysql:

pi_df.groupBy(col("id1"), col("id2"))
  //.agg(collect_list(to_json(struct(col("name"), col("value"))).alias("item_specifics"))) // => not working
  .agg(collect_list(struct(col("name"),col("value"))).alias("item_specifics"))

But I got:

{ "name":"EAN","value":"5057723043", "EPID": "1299", "id1": "xxx", "id2": "yyy" }

How to fix this? Thanks

BAE
  • 8,550
  • 22
  • 88
  • 171

3 Answers3

5

For Spark < 2.4

You can create 2 dataframes, one with name and value and other with epic as name and epic value as value and union them together. Then aggregate them as collect_set and create a json. The code should look like this.

//Creating Test Data
val df = Seq(("xxx","yyy" ,"EAN" ,"5057723043","1299"), ("xxx","yyy" ,"MPN" ,"EVBD", "1299") )
  .toDF("id1", "id2", "name", "value", "epid")

df.show(false)

+---+---+----+----------+----+
|id1|id2|name|value     |epid|
+---+---+----+----------+----+
|xxx|yyy|EAN |5057723043|1299|
|xxx|yyy|MPN |EVBD      |1299|
+---+---+----+----------+----+

val df1 = df.withColumn("map", struct(col("name"), col("value")))
  .select("id1", "id2", "map")

val df2 = df.withColumn("map", struct(lit("EPID").as("name"), col("epid").as("value")))
  .select("id1", "id2", "map")

val jsonDF = df1.union(df2).groupBy("id1", "id2")
  .agg(collect_set("map").as("item_specifics"))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics")))

jsonDF.select("json").show(false)

+---------------------------------------------------------------------------------------------------------------------------------------------+
|json                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|{"id1":"xxx","id2":"yyy","item_specifics":[{"name":"MPN","value":"EVBD"},{"name":"EAN","value":"5057723043"},{"name":"EPID","value":"1299"}]}|
+---------------------------------------------------------------------------------------------------------------------------------------------+

For Spark = 2.4

It provides a array_union method. It might be helpful in doing it without union. I haven't tried it though.

val jsonDF = df.withColumn("map1", struct(col("name"), col("value")))
  .withColumn("map2", struct(lit("epid").as("name"), col("epid").as("value")))
  .groupBy("id1", "id2")
    .agg(collect_set("map1").as("item_specifics1"),
      collect_set("map2").as("item_specifics2"))
  .withColumn("item_specifics", array_union(col("item_specifics1"), col("item_specifics2")))
  .withColumn("json", to_json(struct("id1", "id2", "item_specifics2")))
Apurba Pandey
  • 1,061
  • 10
  • 21
0

You're pretty close. I believe you're looking for something like this:

val pi_df2 = pi_df.withColumn("name", lit("EPID")).
withColumnRenamed("epid", "value").
select("id1", "id2", "name","value")

pi_df.select("id1", "id2", "name","value").
union(pi_df2).withColumn("item_specific", struct(col("name"), col("value"))).
groupBy(col("id1"), col("id2")).
agg(collect_list(col("item_specific")).alias("item_specifics")).
write.json(...)

The union should bring back epid into item_specifics

ayplam
  • 1,943
  • 1
  • 14
  • 20
  • thanks. I tried the first solution and found that there are lots of duplicated `name` and `value` struct in the "item_specific" column. I have no idea why, but I am looking into it. btw, it is possible to add `epid` by `udf`? – BAE Mar 01 '19 at 04:27
  • If there are a lot of duplicates it's probably because the data itself has duplicates. You can also use collect_set if you don't want any, or doing a df.distinct() before groupBy. I don't know of any straightforward way to add epid to udf – ayplam Mar 01 '19 at 04:33
  • why I did not see `epid`? – BAE Mar 01 '19 at 04:34
  • I edited the answer for clarity. epid should be union'd back in to appear in item_specifics – ayplam Mar 01 '19 at 04:37
0

Here is what you need to do

    import scala.util.parsing.json.JSONObject
    import scala.collection.mutable.WrappedArray

    //Define udf
    val jsonFun = udf((id1 : String, id2 : String, item_specifics: WrappedArray[Map[String, String]], epid: String)=> {
 //Add epid to item_specifics json
val item_withEPID = item_specifics :+ Map("epid" -> epid)

val item_specificsArray = item_withEPID.map(m => ( Array(Map("name" -> m.keys.toSeq(0), "value" -> m.values.toSeq(0))))).map(m => m.map( mi => JSONObject(mi).toString().replace("\\",""))).flatten.mkString("[",",","]")

 //Add id1 and id2 to output json
val m = Map("id1"-> id1, "id2"-> id2, "item_specifics" -> item_specificsArray.toSeq )
JSONObject(m).toString().replace("\\","")
})

val pi_df = Seq( ("xxx","yyy","EAN","5057723043","1299"), ("xxx","yyy","MPN","EVBD","1299")).toDF("id1","id2","name","value","epid")

//Add epid as part of group by column else the column will not be available after group by and aggregation
val df = pi_df.groupBy(col("id1"), col("id2"), col("epid")).agg(collect_list(map(col("name"), col("value")) as "map").as("item_specifics")).withColumn("item_specifics",jsonFun($"id1",$"id2",$"item_specifics",$"epid"))

df.show(false)

scala> df.show(false)
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id1|id2|epid|item_specifics                                                                                                                                                      |
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|xxx|yyy|1299|{"id1" : "xxx", "id2" : "yyy", "item_specifics" : [{"name" : "MPN", "value" : "EVBD"},{"name" : "EAN", "value" : "5057723043"},{"name" : "epid", "value" : "1299"}]}|
+---+---+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Content of item_specifics column/ output

{
    "id1": "xxx",
    "id2": "yyy",
    "item_specifics": [{
        "name": "MPN",
        "value": "EVBD"
    }, {
        "name": "EAN",
        "value": "5057723043"
    }, {
        "name": "epid",
        "value": "1299"
    }]
}
m-bhole
  • 1,189
  • 10
  • 21