I have the following JSON document representing information of employee with their address -
[
{"id" : 1000, "name" : "dev", "age" : 30,
"address" : {"city":"noida","state":"UP","pincode":"201201"}},
{"id" : 1001, "name" : "ravi", "age" : 36,
"address" : {"city":"noida","state":"UP","pincode":"201501"}},
{"id" : 1002, "name" : "atul", "age" : 29,
"address" : {"city":"indore","state":"MP","pincode":"485201"}}
]
I am reading that json file using SparkSQL and applying a filter (predicate) on age column to show only the employees having age more than 29.
val spark = SparkSession.builder()
.appName("JsonRead")
.master("local[*]")
.getOrCreate()
val emp_df = spark.read
.option("multiline", true) // since json file having multiline records
.json(getClass.getResource("/sparksql/employee.json").getPath)
emp_df.printSchema()
/*root
* |-- address: struct (nullable = true)
* | |-- city: string (nullable = true)
* | |-- pincode: string (nullable = true)
* | |-- state: string (nullable = true)
* |-- age: long (nullable = true)
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
*/
//emp_df.show()
import spark.implicits._
val emp_ds = emp_df.as[Employee] // Using encoders to convert dataframe to dataset
emp_ds.filter(_.age > 29).explain(true)
}
case class Employee(id: Long, name: String, age: Long, address: Address)
case class Address(city: String, state: String, pincode: String)
Looking at the physical plan, I see no filters in PushedFilters: []
== Physical Plan ==
*(1) Filter <function1>.apply
+- *(1) FileScan json [address#0,age#1L,id#2L,name#3] Batched: false,
Format: JSON, Location:
InMemoryFileIndex
[file:/C:/Users/Dell/mygithub/techBlog/sparkexamples/target/cl
asses/sparksql/emp...,
PartitionFilters: [],
PushedFilters: [],
ReadSchema:
struct<address:struct<city:string,pincode:string,state:string>,age:bigint,
id:bigint,name:string>
Can someone tell me why predicate (age > 29) is not being pushed down? Ideally, it should have been pushed down as part of Spark Catalyst optimizer.