0

I would like to flatten JSON blobs into a Data Frame using Spark/Spark SQl inside Spark-Shell.

val df = spark.sql("select body from test limit 3"); // body is a json encoded blob column
val df2 = df.select(df("body").cast(StringType).as("body"))

when I do

df2.show // shows the 3 rows

body

------------------------------------

{"k1": "v1", "k2": "v2" }

{"k3": "v3"}

{"k4": "v4", "k5": "v5", "k6": "v6"}

-------------------------------------

Now say I have billion of these rows/records but at most there will 5 different json schemas for all billion rows. Now how do I flatten such that I get a data frame in the format below? Should I use df.forEach or df.forEachPartition or df.explode or df.flatMap? How do I make sure I am not creating a billion data frames and trying to union all of them or something more inefficient. It will be great if I could see a code sample. Also since this might have Nil I wonder if they would take up any space?

"K1" | "K2" | "K3" | "K4" | "K5" | "K6"
---------------------------------------
"V1" | "V2" |
            | "V3" |
                   | "V4" | "V5" | "V6"
user1870400
  • 6,028
  • 13
  • 54
  • 115

1 Answers1

3

Not sure what version of Spark you are on, but have a look at this example:

Spark SQL JSON

So if you do something like:

import org.apache.spark.sql._
val rdd = df2.rdd.map { case Row(j: String) => j }
spark.read.json(rdd).show()

Spark SQL will do the heavy lifting.

user1870400
  • 6,028
  • 13
  • 54
  • 115
ImDarrenG
  • 2,315
  • 16
  • 24
  • That didn't quite work. I am using Spark 2.0.2. I get the following error 40: error: overloaded method value json with alternatives: (jsonRDD: org.apache.spark.rdd.RDD[String])org.apache.spark.sql.DataFrame – user1870400 Nov 18 '16 at 09:31
  • If I change it to this spark.read.json(df2.toJSON.rdd).show() I get the same output as df2.show so it really didn't do anything – user1870400 Nov 18 '16 at 09:32
  • What is the type of the RDD returned by df2.rdd? – ImDarrenG Nov 18 '16 at 10:07
  • My bad it's a RDD of rows isn't it. You need to map the RDD of rows to RDD[String] like .map(_(0)) (or whatever the notation is), then pass that RDD[String] into the json() function – ImDarrenG Nov 18 '16 at 10:11
  • Off the top of my head: spark.read.json(rdd.map(r => r(0)) // you may need to cast r(0) to string such as .as[String]. you're essentially mapping the 0th column to a String – ImDarrenG Nov 18 '16 at 10:14
  • Edited answer, hope that helps :) g2g, need to do some work now – ImDarrenG Nov 18 '16 at 10:19
  • @ImDarrenG This might be really late but just a quick question how can we flatten this if we have a extra column in df2 thought this might be a similar answer that i was looking at. If you could take a look at my question that would be really helpful thanks. https://stackoverflow.com/questions/50553789/spark-process-json-data-inside-a-csv-file – ankush reddy May 28 '18 at 00:31