7

Recently I was working with Spark with JDBC data source. Consider following snippet:

val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)

PRED is a list of predicates.

If PRED is a simple predicate, like x = 10, query will be much faster. However, if there are some non-equi conditions like date > someOtherDate or date < someOtherDate2, query is much slower than without predicate pushdown. As you may know, DB engines scans of such predicates are very slow, in my case with even 10 times slower (!).

To prevent unnecessary predicate pushdown I used:

val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)

But it requires a lot of memory and - due to problem mentioned here - Spark' Dataset unpersist behaviour - I can't unpersist cachedDF.

Is there any other option to avoid pushing down predicates? Without caching and without writing own data source?

Note: Even if there is an option to turn off predicate pushdown, it's applicable only is other query may still use it. So, if I wrote:

// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where df1 without predicate pushdown, but df2 with
zero323
  • 322,348
  • 103
  • 959
  • 935
T. Gawęda
  • 15,706
  • 4
  • 46
  • 61
  • While I am not aware (it doesn't mean it doesn't exit of course) of any method that could disable predicate pushdown (short of hacking the plan /simple/ or adding custom rules to place analysis barrier /haven't tried, should work/) I have to strongly disagree with _DB engines scans of such predicates are very slow_. This brings me to question - are you sure that predicate is pushed down at all (Spark is not very smart about that). – zero323 May 14 '18 at 18:28
  • @user6910411 Yes, I saw it in `explain()` result and in SQL that was sent to database. As far as I remember database engines have often poor performance on very complex predicates with many non-equi conditions with OR between them. In my case, SQL without pushed-down WHERE clause was approx. 10 times faster – T. Gawęda May 14 '18 at 18:31
  • `OR` predicates are different than range searches, but the last time I've checked, `OR`s weren't pushed down, excluding cases of chained equality conditions on the same filed (`x = 1 OR x = 2`) – zero323 May 14 '18 at 18:38
  • 2
    That being said - easy hack is to pass fields `udf[T, T](identity _)`. Forcing casting can work in a similar way. For example ` df.where(($"modifieddate" > "2010-05-28 00:00:00"))` where `modifieddate` is Timestamp won't be pushed while `df.where(($"modifieddate" > lit("2010-05-28 00:00:00").cast("timestamp")))` will be – zero323 May 14 '18 at 18:39
  • @user6910411 I changed example in question to contain "OR", because in real-world application this OR+non-equality caused performance degradation on DB2 – T. Gawęda May 14 '18 at 18:40
  • @user6910411 Clever! ;) Please write an answer, it deserves up vote :) Maybe I will also post on Spark Developers mailing list about some built-in "barrier" – T. Gawęda May 14 '18 at 18:41
  • It is way to hacky and I honestly hope there is a better way to do it :) – zero323 May 14 '18 at 18:43
  • Also, if conditions are static you could push some into query, cache the result, and use `where` for the remaining ones (I mean "push" https://stackoverflow.com/q/38729436/6910411). – zero323 May 14 '18 at 18:45
  • @user6910411 As I wrote I did it, but it requires much memory to cache whole dataset, especially when I must do the same trick few times ;) – T. Gawęda May 14 '18 at 18:47
  • 1
    I think you missed my point. Problem with cache, is that can disable pushdowns completely. However if you push some predicates into the query string, Spark won't touch this part. But it is of course, yet another hack. – zero323 May 14 '18 at 18:49
  • @user6910411 Got it, thanks :) – T. Gawęda May 14 '18 at 18:50

1 Answers1

3

A JIRA ticket has been opened for this issue. You can follow it here : https://issues.apache.org/jira/browse/SPARK-24288

eliasah
  • 39,588
  • 11
  • 124
  • 154