5

I am trying to use SQL on a spark data frame. But the data frame has 1 value has string (which is JSON like structure) :

I saved my data frame to temp table : TestTable

When I did desc :

col_name                       data_type
requestId                       string  
name                            string  
features                        string  

But features values is a json :

{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}

I just want to query on TestTable where totalSpent > 10. Can some tell me how do I do this ?

My JSON file looks like :

   {
        "requestId": 232323,
        "name": "ravi",
        "features": "{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"
    }

features is a string. I only need totalSpent in that. I tried with :

val features = StructType( 
Array(StructField("totalSpent",LongType,true), 
StructField("movies",LongType,true) 
))

val schema = StructType(Array( 
StructField("requestId",StringType,true), 
StructField("name",StringType,true), 
StructField("features",features,true), 
) 
)

val records = sqlContext.read.schema(schema).json(filePath)

Since each request has one JSON string of features. But this gives me error.

When I tried with

val records = sqlContext.jsonFile(filePath)

records.printSchema

shows me :

root
 |-- requestId: string (nullable = true)
 |-- features: string (nullable = true)
 |-- name: string (nullable = true)

Can I use parallelize inside StructField while creating schema ? I tried with :

I first tried with : 

val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)


 val schema = StructType(Array( 
    StructField("requestId",StringType,true), 
    StructField("name", StructType(events, true),true), 
    StructField("features",features,true), 
    ) 
    )

This gives me error as well. Also tried :

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

This gives me : 
<console>:78: error: object liftweb is not a member of package net
       import net.liftweb.json.parse

Tried :

I tried with :

 val parseJson = udf((s: String) => {
  sqlContext.read.json(s)
})

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

But again error.

Tried :

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

val parseJson = udf((s: String) => { 
parse(s) 
}) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show

But it gives me :

java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)

This gives me proper schema (based on the answer given by zero323 :

val extractFeatures = udf((features: String) => Try {
implicit val formats = DefaultFormats
  parse(features).extract[Features]
}.toOption)

val parsed = records.withColumn("features", extractFeatures($"features"))

parsed.printSchema

But When I query :

val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent")

value.show gives null.

Swetha
  • 177
  • 1
  • 1
  • 14
  • @zero323 : Hi, I went through the other question u answered, but I don't quite understand it. Can you please explain me ? I am very new to spark. – Swetha Jul 17 '16 at 16:38
  • Which is part is not clear? It is a long answer showing different methods depending on requirements and version. Do you consider any particular solution? – zero323 Jul 17 '16 at 16:52
  • @zero323 I tried with using parallise on my schema because I use schema to get other values so I prefer using it in schema but it gives error. I am inclined towards UDF to parse JSON, where you mentioned case class KV(k: String, v: Int) Is it because of the way his json is formatted ? – Swetha Jul 17 '16 at 17:12
  • @zero323 for this - import net.liftweb.json.parse I keep getting :78: error: object liftweb is not a member of package net import net.liftweb.json.parse I am using spark 1.5 – Swetha Jul 17 '16 at 17:16
  • 1
    Because you have to include required artifacts in your project. It can be any JSON lib you like. Spark comes with json4s (+ jackson) which uses indentiacal DSL (https://github.com/json4s/json4s) as lift but you don't feel comfortable with this things the choice is to update and use `get_json_object` with desired type casting. – zero323 Jul 17 '16 at 17:21
  • @zero323 Oh..ok..then let me try with jackson parser. Unfortunately I can not update. – Swetha Jul 17 '16 at 17:25
  • 1
    `import org.json4s._; import org.json4s.jackson.JsonMethods._` - the rest should more or less similar. – zero323 Jul 17 '16 at 17:27
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/117545/discussion-between-swetha-and-zero323). – Swetha Jul 17 '16 at 17:33

1 Answers1

2

When you return data from UDF it has to be representable as SQL types and JSON AST is not. One approach is to create a case class similar to this one:

case class Features(
  places: Integer, 
  movies: Integer,
  totalPlacesVisited: Integer, 
  totalSpent: Integer,
  SpentMap: Map[String, Integer],
  benefits: Map[String, Integer]
) 

and use it to extract objects:

val df = Seq((
  232323, "ravi",
  """{"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":{"Movie":2,"Park Visit":11},"benefits":{"freeTime":13}}"""
)).toDF("requestId", "name", "features")

val extractFeatures = udf((features: String) => 
  parse(features).extract[Features])

val parsed = df.withColumn("features", extractFeatures($"features"))
parsed.show(false)

// +---------+----+-----------------------------------------------------------------+
// |requestId|name|features                                                         |
// +---------+----+-----------------------------------------------------------------+
// |232323   |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]|
// +---------+----+-----------------------------------------------------------------+

parsed.printSchema

// root
//  |-- requestId: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- features: struct (nullable = true)
//  |    |-- places: integer (nullable = true)
//  |    |-- movies: integer (nullable = true)
//  |    |-- totalPlacesVisited: integer (nullable = true)
//  |    |-- totalSpent: integer (nullable = true)
//  |    |-- SpentMap: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)
//  |    |-- benefits: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)

Depending on the other records and expected usage you should adjust representation and add relevant error handling logic.

You can also use DSL to access individual fields as strings:

val getMovieSpent = udf((s: String) => 
  compact(render(parse(s) \\ "SpentMap" \\ "Movie")))

df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show
// +---------+----+--------------------+-----------+
// |requestId|name|            features|movie_spent|
// +---------+----+--------------------+-----------+
// |   232323|ravi|{"places":11,"mov...|          2|
// +---------+----+--------------------+-----------+

For alternative approaches see How to query JSON data column using Spark DataFrames?

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Yeah. My bad :) Thank you for detailed explanation. – Swetha Jul 17 '16 at 18:40
  • It works fine. But I try parsed.show It gives me : java.lang.NullPointerExceptio – Swetha Jul 17 '16 at 19:40
  • Like I said before you'll need proper exception handling and probably some adjustments depending on how regular is your data. For starters you can use `scala.util.Try(...).toOption` but it is a very crude approach. – zero323 Jul 17 '16 at 20:04
  • Oh..ok.. Thanks for the advice.. I am getting org.json4s.package$MappingException: Parsed JSON values do not match with class constructor – Swetha Jul 17 '16 at 20:05
  • Well, it is much easier in 1.6+ You can try DSL which may be slightly more user friendly. I updated the answer with a small example. – zero323 Jul 17 '16 at 21:00
  • Will this work in 1.5 as well ? I tried : val getMovieSpent = udf((s: String) => compact(render(parse(s) \\ "totalSpent"))) records.withColumn("money_spent", getMovieSpent($"features").cast("bigint")).show – Swetha Jul 17 '16 at 21:41
  • But still error :( Even for this : val extractFeatures = udf((features: String) => Try { implicit val formats = DefaultFormats parse(features).extract[Features] }.toOption) When I query : val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent") I get for null value.show – Swetha Jul 17 '16 at 21:52
  • Your data has to have different format than is expected. I would start with comparing this to the dummy example I put in the answer. – zero323 Jul 17 '16 at 21:59
  • I tried with your example dummy data. When I do parsed.filter($"requestId" === "232323" ).select($"features.totalSpent").show still gives me null – Swetha Jul 17 '16 at 22:03
  • But the recent DSL work with the dummy data. But When I use it my df it shows : getMovieSpent: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(,StringType,List(StringType)) java.lang.NullPointerException – Swetha Jul 17 '16 at 22:09
  • But my data is same as example. The schema is showing correct after transformation. But the value is coming as null – Swetha Jul 17 '16 at 22:13