1

How can I setup my spark jdbc options to make sure I push down a filter predicate to the database and not load everything first? I'm using spark 2.1. Can't get the right syntax to use and I know I can add a where clause after the load() but that would obviously load everything first. I'm trying the below but whereas this filter would take a couple of seconds when running in my db client it doesn't return anything and just keeps running when trying to push down the predicate from spark jdbc.

val jdbcReadOpts = Map(
  "url" -> url,
  "driver" -> driver,
  "user" -> user,
  "password" -> pass,
  "dbtable" -> tblQuery,
  "inferSchema" -> "true")

val predicate = "DATE(TS_COLUMN) = '2018-01-01'"
// Also tried -> val predicate = "SIMPLEDATECOL = '2018-01-01'"

val df = spark.read.format("jdbc")
  .options(jdbcReadOpts)
  .load().where(predicate)
horatio1701d
  • 8,809
  • 14
  • 48
  • 77

2 Answers2

0

and I know I can add a where clause after the load() but that would obviously load everything first.

This is not true. Predicates used in where and that can be pushed down, (not every predicate can be, with predicates based on functions as the most obvious example), are automatically pushed. Beware that caching might affect predicate pushdown in some cases.

To push down limits, aggregations, and non-trivial predicates you can utilize query strings: Does spark predicate pushdown work with JDBC?.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Thank you. Just so I understand. Do you mean to say that using `where` after `load()` would not load the entire table first? – horatio1701d Jan 03 '18 at 18:26
  • thank you. I updated my question to include what I am attempting but not understanding why it is not returning the dataframe quickly if I'm applying the where predicate. – horatio1701d Jan 03 '18 at 18:41
  • Because it involves function call, which won't be pushed down. – Alper t. Turker Jan 03 '18 at 18:44
  • 1
    Also tried using a simple column which already contains date format using `val predicate = "DATECOL = '2018-01-01'` which also does not work. the result is only about 1K rows. Sorry if I missed something. – horatio1701d Jan 03 '18 at 18:50
  • Did you check database logs for executed query? Also could you add execution plan? – Alper t. Turker Jan 03 '18 at 18:52
  • I see these lines in the query plan in the spark UI. not too familiar yet with parsing these but this seems like it's applying the filter I believe. `+- Filter (cast(DATECOL#21 as string) = 2018-01-01)` – horatio1701d Jan 03 '18 at 19:13
0

As per ParquetFilters.scala source code, only below types are allowed to be pushed down as predicate filter.

BooleanType, IntegerType, LongType, FloatType, DoubleType, BinaryType

I found the these links useful to understand this functionality.

Stephen Rauch
  • 47,830
  • 31
  • 106
  • 135
Gowtham
  • 320
  • 2
  • 9