3

I want to read the JSON file in the below format:-

 {
  "titlename": "periodic",
    "atom": [
         {
          "usage": "neutron",
          "dailydata": [
    {
      "utcacquisitiontime": "2017-03-27T22:00:00Z",
      "datatimezone": "+02:00",
      "intervalvalue": 28128,
      "intervaltime": 15          
    },
    {
      "utcacquisitiontime": "2017-03-27T22:15:00Z",
      "datatimezone": "+02:00",
      "intervalvalue": 25687,
      "intervaltime": 15          
    }
   ]
  }
 ]
}

I am writing my read line as:

sqlContext.read.json("user/files_fold/testing-data.json").printSchema

But I not getting the desired result-

root                                                                            
  |-- _corrupt_record: string (nullable = true)

Please help me on this

Ninja
  • 115
  • 1
  • 1
  • 12

5 Answers5

5

I suggest using wholeTextFiles to read the file and apply some functions to convert it to a single-line JSON format.

val json = sc.wholeTextFiles("/user/files_fold/testing-data.json").
  map(tuple => tuple._2.replace("\n", "").trim)

val df = sqlContext.read.json(json)

You should have the final valid dataframe as

+--------------------------------------------------------------------------------------------------------+---------+
|atom                                                                                                    |titlename|
+--------------------------------------------------------------------------------------------------------+---------+
|[[WrappedArray([+02:00,15,28128,2017-03-27T22:00:00Z], [+02:00,15,25687,2017-03-27T22:15:00Z]),neutron]]|periodic |
+--------------------------------------------------------------------------------------------------------+---------+

And valid schema as

root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • If I need to flatten this dataframe, how can it be acheived? simple `explode` is not working – Sankar Jun 13 '21 at 06:27
3

Spark 2.2 introduced multiLine option which can be used to load JSON (not JSONL) files:

spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
  .json("/path/to/user.json")
1

This has already been answered nicely by other contributors, but I had one question which is how do i access each nested value/unit of the dataframe.

So, for collections, we can use explode and for struct types we can directly call the unit by dot(.).

scala> val a = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/hdfs/spark_2.json")
a: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]

scala> a.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)


scala> val b = a.withColumn("exploded_atom", explode(col("atom")))
b: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 1 more field]

scala> b.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)


scala>

scala> val c = b.withColumn("exploded_atom_struct", explode(col("`exploded_atom`.dailydata")))
c: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 2 more fields]

scala>

scala> c.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)
 |-- exploded_atom_struct: struct (nullable = true)
 |    |-- datatimezone: string (nullable = true)
 |    |-- intervaltime: long (nullable = true)
 |    |-- intervalvalue: long (nullable = true)
 |    |-- utcacquisitiontime: string (nullable = true)


scala> val d = c.withColumn("exploded_atom_struct_last", col("`exploded_atom_struct`.utcacquisitiontime"))
d: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 3 more fields]


scala> d.printSchema
root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)
 |-- exploded_atom: struct (nullable = true)
 |    |-- dailydata: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |-- usage: string (nullable = true)
 |-- exploded_atom_struct: struct (nullable = true)
 |    |-- datatimezone: string (nullable = true)
 |    |-- intervaltime: long (nullable = true)
 |    |-- intervalvalue: long (nullable = true)
 |    |-- utcacquisitiontime: string (nullable = true)
 |-- exploded_atom_struct_last: string (nullable = true)


scala> val d = c.select(col("titlename"), col("exploded_atom_struct.*"))
d: org.apache.spark.sql.DataFrame = [titlename: string, datatimezone: string ... 3 more fields]

scala> d.show
+---------+------------+------------+-------------+--------------------+
|titlename|datatimezone|intervaltime|intervalvalue|  utcacquisitiontime|
+---------+------------+------------+-------------+--------------------+
| periodic|      +02:00|          15|        28128|2017-03-27T22:00:00Z|
| periodic|      +02:00|          15|        25687|2017-03-27T22:15:00Z|
+---------+------------+------------+-------------+--------------------+

So thought of posting it here, in case if anyone has similar questions seeing this question.

Sankar
  • 546
  • 4
  • 15
0

It probably has something to do with the JSON object stored inside your file, could you print it or make sure it's the one you provided in the question? I'm asking because I took that one and it runs just fine:

val json =
  """
    |{
    |  "titlename": "periodic",
    |  "atom": [
    |    {
    |      "usage": "neutron",
    |      "dailydata": [
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:00:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 28128,
    |          "intervaltime": 15
    |        },
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:15:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 25687,
    |          "intervaltime": 15
    |        }
    |      ]
    |    }
    |  ]
    |}
  """.stripMargin

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.read
  .json(spark.sparkContext.parallelize(Seq(json)))
  .printSchema()
Andrei T.
  • 2,455
  • 1
  • 13
  • 28
0

From the Apache Spark SQL Docs

Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.

Thus,

{ "titlename": "periodic","atom": [{ "usage": "neutron", "dailydata": [ {"utcacquisitiontime": "2017-03-27T22:00:00Z","datatimezone": "+02:00","intervalvalue": 28128,"intervaltime":15},{"utcacquisitiontime": "2017-03-27T22:15:00Z","datatimezone": "+02:00", "intervalvalue": 25687,"intervaltime": 15 }]}]}

And then:

val jsonDF = sqlContext.read.json("file")
jsonDF: org.apache.spark.sql.DataFrame = 
[atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, 
titlename: string]
philantrovert
  • 9,904
  • 3
  • 37
  • 61