1

I have a nested JSON dataframe in Spark which looks like below

root
 |-- data: struct (nullable = true)
 |    |-- average: long (nullable = true)
 |    |-- sum: long (nullable = true)
 |    |-- time: string (nullable = true)
 |-- password: string (nullable = true)
 |-- url: string (nullable = true)
 |-- username: string (nullable = true)

I need to convert the time variable under the data struct to timestamp data type. Following is the code I tried, but did not give me the result i wanted.

val jsonStr = """{
"url": "imap.yahoo.com",
"username": "myusername",
"password": "mypassword",
"data": {
"time":"2017-1-29 0-54-32",
"average": 234,
"sum": 123}}"""


  val json: JsValue = Json.parse(jsonStr)

  import sqlContext.implicits._
  val rdd = sc.parallelize(jsonStr::Nil);
  var df = sqlContext.read.json(rdd);
  df.printSchema()
  val dfRes = df.withColumn("data",makeTimeStamp(unix_timestamp(df("data.time"),"yyyy-MM-dd hh-mm-ss").cast("timestamp")))
  dfRes.printSchema();

case class Convert(time: java.sql.Timestamp)
val makeTimeStamp = udf((time: java.sql.Timestamp) => Convert(
  time))

Result of my code:

root
 |-- data: struct (nullable = true)
 |    |-- time: timestamp (nullable = true)
 |-- password: string (nullable = true)
 |-- url: string (nullable = true)
 |-- username: string (nullable = true)

My code is actually removing the other elements inside the data struct(which are average and sum) instead of just casting the time string to timestamp data type. For basic data management operations on JSON dataframes, Do we need to write UDF as and when we need a functionality or is there a library available for JSON data management. I am currently using Play framework for working with JSON objects in spark. Thanks in advance.

Sid
  • 251
  • 2
  • 4
  • 17

2 Answers2

0

You can try this:

val jsonStr = """{
"url": "imap.yahoo.com",
"username": "myusername",
"password": "mypassword",
"data": {
"time":"2017-1-29 0-54-32",
"average": 234,
"sum": 123}}"""


val json: JsValue = Json.parse(jsonStr)

import sqlContext.implicits._
val rdd = sc.parallelize(jsonStr::Nil);
var df = sqlContext.read.json(rdd);
df.printSchema()
val dfRes = df.withColumn("data",makeTimeStamp(unix_timestamp(df("data.time"),"yyyy-MM-dd hh-mm-ss").cast("timestamp"), df("data.average"), df("data.sum")))

case class Convert(time: java.sql.Timestamp, average: Long, sum: Long)
val makeTimeStamp = udf((time: java.sql.Timestamp, average: Long, sum: Long) => Convert(time, average, sum))

This will give the result:

root
|-- url: string (nullable = true)
|-- username: string (nullable = true)
|-- password: string (nullable = true)
|-- data: struct (nullable = true)
|    |-- time: timestamp (nullable = true)
|    |-- average: long (nullable = false)
|    |-- sum: long (nullable = false)

The only thing changed is Convert case class and makeTimeStamp UDF.

himanshuIIITian
  • 5,985
  • 6
  • 50
  • 70
  • Thanks for responding. Your solution does help in keeping the elements under data tree intact. But the real scenario I am working on, has too many elements inside the data struct. Adding all of them to the case class is not a viable solution. Also the number of elements inside the data struct can change over time. I am looking for a solution that can scale easily. – Sid Mar 15 '17 at 14:06
0

Assuming you can specify the Spark schema upfront, the automatic string-to-timestamp type coercion should take care of the conversions.

import org.apache.spark.sql.types._
val dschema = (new StructType).add("url", StringType).add("username", StringType).add
           ("data", (new StructType).add("sum", LongType).add("time", TimestampType))
val df = spark.read.schema(dschema).json("/your/json/on/hdfs")
df.printSchema
df.show

This article outlines a few more techniques to deal with bad data; worth a read for your use-case.

Sanjay T. Sharma
  • 22,857
  • 4
  • 59
  • 71