2

I have a json file like this:

{    
  "Item Version" : 1.0,    
  "Item Creation Time" : "2019-04-14 14:15:09",        
  "Trade Dictionary" : {    
    "Country" : "India",    
    "TradeNumber" : "1",    
    "action" : {    
      "Action1" : false,    
      "Action2" : true,    
      "Action3" : false    
    },    
    "Value" : "XXXXXXXXXXXXXXX",    
    "TradeRegion" : "Global"    
  },    
  "Prod" : {    
    "Type" : "Driver",    
    "Product Dic" : { },    
    "FX Legs" : [ {    
      "Spot Date" : "2019-04-16",        
      "Value" : true    
    } ]    
  },    
  "Payments" : {    
    "Payment Details" : [ {    
      "Payment Date" : "2019-04-11",    
      "Payment Type" : "Rej"
    } ]
  }
}

I need a table in below format:

Version|Item Creation Time|Country|TradeNumber|Action1|Action2|Action3|Value |TradeRegion|Type|Product Dic|Spot Date |Value|Payment Date|Payment Type |
1 |2019-04-14 14:15 | India| 1 | false| true | false |xxxxxx|Global |Driver|{} |2019-04-16 |True |2019-11-14 |Rej

So it will just iterate each key value pair, put the key as column name and it's values to table values.

My current code:

val data2 = data.withColumn("vars",explode(array($"Product")))
  .withColumn("subs", explode($"vars.FX Legs"))
  .select($"vars.*",$"subs.*")

The problem here is that I have to provide the column names myself. Is there any way to make this more generic?

Shaido
  • 27,497
  • 23
  • 70
  • 73
Nilay
  • 113
  • 7
  • Have you tried exploding the array columns? – Aadhil Rushdy Nov 14 '19 at 07:02
  • data2=data.withColumn("vars",explode(array($"Product"))).withColumn("subs", explode($"vars.FX Legs")).select($"vars.*",$"subs.*") I have trued this code.but i want to make it look like a generic code where i don't have to provide the column names.It will just iterate by it self – Nilay Nov 14 '19 at 07:11
  • @Nilay Can you please edit the question to make it clear? – Aadhil Rushdy Nov 14 '19 at 07:26
  • related (albeit using R spark interface): https://stackoverflow.com/questions/52194942/how-to-flatten-the-data-of-different-data-types-by-using-sparklyr-package/52363983#52363983 You can parse the schema without having to specify it manually. – zacdav Nov 14 '19 at 07:56
  • @Nilay: I edited the question to include the expected output that seemed to have disappeared as well as the code in your comment. Please check so that everything looks correct. :) – Shaido Nov 15 '19 at 03:22
  • @Shaido...Thank You..By Any chance do you get any answer for this?? – Nilay Nov 15 '19 at 04:54
  • @Nilay: It's not very straight-forward since you have multiple layers with arrays and structs mixed together. I added an answer that should solve it but it's a bit complex. – Shaido Nov 15 '19 at 08:16
  • @Shadio Can You help me with this?? https://stackoverflow.com/questions/58996330/how-to-convert-a-list-of-maps-into-maps-in-scala – Nilay Nov 22 '19 at 15:00

3 Answers3

0

Use explode function to flatten dataframes with arrays. Here is an example:

val df = spark.read.json(Seq(json).toDS.rdd)
df.show(10, false)
df.printSchema

df: org.apache.spark.sql.DataFrame = [Item Creation Time: string, Item Version: double ... 3 more fields]
+-------------------+------------+--------------------------------+----------------------------------------+---------------------------------------------------+
|Item Creation Time |Item Version|Payments                        |Prod                                    |Trade Dictionary                                   |
+-------------------+------------+--------------------------------+----------------------------------------+---------------------------------------------------+
|2019-04-14 14:15:09|1.0         |[WrappedArray([2019-04-11,Rej])]|[WrappedArray([2019-04-16,true]),Driver]|[India,1,Global,XXXXXXXXXXXXXXX,[false,true,false]]|
+-------------------+------------+--------------------------------+----------------------------------------+---------------------------------------------------+
root
 |-- Item Creation Time: string (nullable = true)
 |-- Item Version: double (nullable = true)
 |-- Payments: struct (nullable = true)
 |    |-- Payment Details: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Payment Date: string (nullable = true)
 |    |    |    |-- Payment Type: string (nullable = true)
 |-- Prod: struct (nullable = true)
 |    |-- FX Legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Spot Date: string (nullable = true)
 |    |    |    |-- Value: boolean (nullable = true)
 |    |-- Type: string (nullable = true)
 |-- Trade Dictionary: struct (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- TradeNumber: string (nullable = true)
 |    |-- TradeRegion: string (nullable = true)
 |    |-- Value: string (nullable = true)
 |    |-- action: struct (nullable = true)
 |    |    |-- Action1: boolean (nullable = true)
 |    |    |-- Action2: boolean (nullable = true)
 |    |    |-- Action3: boolean (nullable = true)


val flat = df
    .select($"Item Creation Time", $"Item Version", explode($"Payments.Payment Details") as "row")
    .select($"Item Creation Time", $"Item Version", $"row.*")
flat.show

flat: org.apache.spark.sql.DataFrame = [Item Creation Time: string, Item Version: double ... 2 more fields]
+-------------------+------------+------------+------------+
| Item Creation Time|Item Version|Payment Date|Payment Type|
+-------------------+------------+------------+------------+
|2019-04-14 14:15:09|         1.0|  2019-04-11|         Rej|
+-------------------+------------+------------+------------+
shuvalov
  • 4,713
  • 2
  • 20
  • 17
  • I have alreday tried this. Is ther any way where I don't have to provide column details like this "Payments.Payment Details" – Nilay Nov 14 '19 at 07:14
  • @Nilay You need to explode to make it struct, so you get a.b.c way to access elements – sumitya Nov 14 '19 at 07:54
  • @Syadav...Explode options I have alreday tried.But I want something generic which can be applicable on any json.We don't have to mention the column details like Paypemt.paymentDetails – Nilay Nov 15 '19 at 04:57
0

This Solution can be achieved very easily using a library named JFlat - https://github.com/opendevl/Json2Flat.

String str = new String(Files.readAllBytes(Paths.get("/path/to/source/file.json")));

JFlat flatMe = new JFlat(str);

//get the 2D representation of JSON document
List<Object[]> json2csv = flatMe.json2Sheet().getJsonAsSheet();

//write the 2D representation in csv format
flatMe.write2csv("/path/to/destination/file.json");
0

Since you have both array and struct columns mixed together in multiple levels it is not that simple to create a general solution. The main problem is that the explode function must be executed on all array column which is an action.

The simplest solution I can come up with uses recursion to check for any struct or array columns. If there are any then those will be flattened and then we check again (after flattening there will be additional columns which can be arrays or structs, hence the complexity). The flattenStruct part is from here.

Code:

def flattenStruct(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)   
    f.dataType match {
      case st: StructType => flattenStruct(st, colName)
      case _ => Array(col(colName))
    }
  })
}

def flattenSchema(df: DataFrame): DataFrame = {
    val structExists = df.schema.fields.filter(_.dataType.typeName == "struct").size > 0
    val arrayCols = df.schema.fields.filter(_.dataType.typeName == "array").map(_.name)

    if(structExists){
        flattenSchema(df.select(flattenStruct(df.schema):_*))
    } else if(arrayCols.size > 0) {
        val newDF = arrayCols.foldLeft(df){
          (tempDf, colName) => tempDf.withColumn(colName, explode(col(colName)))
        }
        flattenSchema(newDF)
    } else {
        df
    }
}

Running the above method on the input dataframe:

flattenSchema(data)

will give a dataframe with the following schema:

root
 |-- Item Creation Time: string (nullable = true)
 |-- Item Version: double (nullable = true)
 |-- Payment Date: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Spot Date: string (nullable = true)
 |-- Value: boolean (nullable = true)
 |-- Product Dic: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- TradeNumber: string (nullable = true)
 |-- TradeRegion: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Action1: boolean (nullable = true)
 |-- Action2: boolean (nullable = true)
 |-- Action3: boolean (nullable = true)

To keep the prefix of the struct columns in the name of the new columns, you only need to adjust the last case in the flattenStruct function:

case _ => Array(col(colName).as(colName.replace(".", "_")))
Shaido
  • 27,497
  • 23
  • 70
  • 73
  • is there any way I can add the prefix names like instead of only printing Action1 I want to print Teade dictionary_action_Action1 – Nilay Nov 19 '19 at 13:32
  • @Nilay: That should be possible. Without testing, I think you can simply change a line in `flattenStruct`: Change `case _ => Array(col(colName))` to `case _ => Array(col(colName).as(s"${prefix.replace('.', '_')}_${colName}" ))`. – Shaido Nov 20 '19 at 01:23
  • The Problem I am facing is suppose i have a json file like here https://spreadsheets.google.com/feeds/list/0Av2v4lMxiJ1AdE9laEZJdzhmMzdmcW90VWNfUTYtM2c/2/public/basic?alt=json Now If I am running this the op is ambigious records because of a lot of $t in the key name.I am trying a way to solve that – Nilay Nov 20 '19 at 02:04
  • @Nilay: I see. You can create the string like this instead `prefix.replace('.', '_') + "_" + colName`. If necessary, you can remove all `$` or replace them with another char, same as for `.`. – Shaido Nov 20 '19 at 02:34
  • I tried "case _ => Array(col(colName).as(s"${prefix + "." + colName}"))" But i was getting this error annot resolve '`feed.feed.author`' given input columns: [feed.updated.feed.updated.$t, feed.feed.xmlns$gsx, feed.title.feed.title.$t, feed.feed.xmlns$openSearch SO I tried "case _ => Array(col(colName).as(s"${colName}"))" BUt now getting org.apache.spark.sql.AnalysisException: cannot resolve '`feed.author`' given input columns: [feed.xmlns, feed.author, feed.updated.$t, feed.openSearch$totalResults.$t, feed.title.type, feed.category, feed.link, feed.title.$t, – Nilay Nov 20 '19 at 03:26
  • @Nilay: I added a small code change to the end of the after. See if it works for you. Any `$` should not be a problem. – Shaido Nov 20 '19 at 03:44
  • "(prefix + "_" + colName)" this staement in the updated code was the reason why coulmn names were written multiple times like the column name was coming like "feed_feed_author_email_feed_feed_author_email_Tval" instead of "feed_author_email_Tval" So I just removed that part..Now the code looks like case _ => { val newName = colName.replace(".", "_") Array(col(colName).as(newName)) And it's working – Nilay Nov 20 '19 at 05:56
  • @Nilay: I must have missed that and tried to overcomplicate the solution.... Check the revised, simplified change in the answer. – Shaido Nov 20 '19 at 06:02
  • @Nilay: I prefer to keep things separated. :) But you can ping me here (or any other question) and link to any new question and I will check if I can answer. – Shaido Nov 20 '19 at 06:21
  • @Shadio.....Can you take a look at this?https://stackoverflow.com/questions/58996330/how-to-convert-a-list-of-maps-into-maps-in-scala – Nilay Nov 22 '19 at 16:48