1

I have the following JSON document representing information of employee with their address -

[
{"id" : 1000, "name" : "dev", "age" : 30, 
     "address" : {"city":"noida","state":"UP","pincode":"201201"}},
{"id" : 1001, "name" : "ravi", "age" : 36, 
     "address" : {"city":"noida","state":"UP","pincode":"201501"}},
{"id" : 1002, "name" : "atul", "age" : 29, 
     "address" : {"city":"indore","state":"MP","pincode":"485201"}}
]

I am reading that json file using SparkSQL and applying a filter (predicate) on age column to show only the employees having age more than 29.

 val spark = SparkSession.builder()
  .appName("JsonRead")
  .master("local[*]")
  .getOrCreate()

val emp_df = spark.read
  .option("multiline", true) // since json file having multiline records
  .json(getClass.getResource("/sparksql/employee.json").getPath)

emp_df.printSchema()
/*root
 *  |-- address: struct (nullable = true)
 *  |    |-- city: string (nullable = true)
 *  |    |-- pincode: string (nullable = true)
 *  |    |-- state: string (nullable = true)
 *  |-- age: long (nullable = true)
 *  |-- id: long (nullable = true)
 *  |-- name: string (nullable = true)
 */

//emp_df.show()
import spark.implicits._
val emp_ds = emp_df.as[Employee] // Using encoders to convert dataframe to dataset
emp_ds.filter(_.age > 29).explain(true)

}

case class Employee(id: Long, name: String, age: Long, address: Address)
case class Address(city: String, state: String, pincode: String)

Looking at the physical plan, I see no filters in PushedFilters: []

== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan json [address#0,age#1L,id#2L,name#3] Batched: false, 
Format: JSON, Location:
InMemoryFileIndex
[file:/C:/Users/Dell/mygithub/techBlog/sparkexamples/target/cl
asses/sparksql/emp..., 
PartitionFilters: [], 
PushedFilters: [],
ReadSchema: 
struct<address:struct<city:string,pincode:string,state:string>,age:bigint,
id:bigint,name:string>

Can someone tell me why predicate (age > 29) is not being pushed down? Ideally, it should have been pushed down as part of Spark Catalyst optimizer.

thedevd
  • 683
  • 11
  • 26

0 Answers0